asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [6/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
Date Sun, 15 May 2016 19:03:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
deleted file mode 100644
index d11c3de..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedFrameSpiller {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
-    private final IHyracksTaskContext ctx;
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor policyAccessor;
-    private BufferedOutputStream bos;
-    private File file;
-    private boolean fileCreated = false;
-    private long bytesWritten = 0;
-    private int spilledFrameCount = 0;
-
-    public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            FeedPolicyAccessor policyAccessor) throws HyracksDataException {
-        this.ctx = ctx;
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.policyAccessor = policyAccessor;
-    }
-
-    public boolean processMessage(ByteBuffer message) throws HyracksDataException {
-        if (!fileCreated) {
-            createFile();
-            fileCreated = true;
-        }
-        long maxAllowed = policyAccessor.getMaxSpillOnDisk();
-        if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
-            return false;
-        } else {
-            try {
-                bos.write(message.array());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-            bytesWritten += message.array().length;
-            spilledFrameCount++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
-            }
-            return true;
-        }
-    }
-
-    private void createFile() throws HyracksDataException {
-        try {
-            Date date = new Date();
-            String dateSuffix = date.toString().replace(' ', '_');
-            String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
-                    + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
-
-            file = new File(fileName);
-            if (!file.exists()) {
-                boolean success = file.createNewFile();
-                if (!success) {
-                    throw new HyracksDataException(
-                            "Unable to create spill file " + fileName + " for feed " + runtimeId);
-                } else {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Created spill file " + file.getAbsolutePath());
-                    }
-                }
-            }
-            bos = new BufferedOutputStream(new FileOutputStream(file));
-        } catch (Throwable th) {
-            throw new HyracksDataException(th);
-        }
-    }
-
-    public Iterator<ByteBuffer> replayData() throws Exception {
-        bos.flush();
-        return new FrameIterator(ctx, file.getName());
-    }
-
-    private static class FrameIterator implements Iterator<ByteBuffer> {
-
-        private final BufferedInputStream bis;
-        private final IHyracksTaskContext ctx;
-        private int readFrameCount = 0;
-
-        public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
-            bis = new BufferedInputStream(new FileInputStream(new File(filename)));
-            this.ctx = ctx;
-        }
-
-        @Override
-        public boolean hasNext() {
-            boolean more = false;
-            try {
-                more = bis.available() > 0;
-                if (!more) {
-                    bis.close();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            return more;
-        }
-
-        @Override
-        public ByteBuffer next() {
-            IFrame frame = null;
-            try {
-                frame = new VSizeFrame(ctx);
-                Arrays.fill(frame.getBuffer().array(), (byte) 0);
-                bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
-                readFrameCount++;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Read spill frome " + readFrameCount);
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            return frame.getBuffer();
-        }
-
-        @Override
-        public void remove() {
-        }
-
-    }
-
-    public void reset() {
-        bytesWritten = 0;
-        //  file.delete();
-        fileCreated = false;
-        bos = null;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Resetted the FrameSpiller!");
-        }
-    }
-
-    public void close() {
-        if (bos != null) {
-            try {
-                bos.flush();
-                bos.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
deleted file mode 100644
index d3897f3..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
-
-    private final FrameTupleAccessor frameAccessor;
-    private final int numOpenFields;
-
-    public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
-        this.frameAccessor = frameAccessor;
-        int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
-        int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
-        numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
-    }
-
-    public int getFeedIntakePartition(int tupleIndex) {
-        ByteBuffer buffer = frameAccessor.getBuffer();
-        int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
-        int openPartOffsetOrig = buffer.getInt(recordStart + 6);
-        int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
-        return buffer.getInt(recordStart + partitionOffset);
-    }
-
-
-
-    @Override
-    public int getFieldCount() {
-        return frameAccessor.getFieldCount();
-    }
-
-    @Override
-    public int getFieldSlotsLength() {
-        return frameAccessor.getFieldSlotsLength();
-    }
-
-    @Override
-    public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldLength(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldLength(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleEndOffset(int tupleIndex) {
-        return frameAccessor.getTupleEndOffset(tupleIndex);
-    }
-
-    @Override
-    public int getTupleStartOffset(int tupleIndex) {
-        return frameAccessor.getTupleStartOffset(tupleIndex);
-    }
-
-    @Override
-    public int getTupleCount() {
-        return frameAccessor.getTupleCount();
-    }
-
-    @Override
-    public ByteBuffer getBuffer() {
-        return frameAccessor.getBuffer();
-    }
-
-    @Override
-    public void reset(ByteBuffer buffer) {
-        frameAccessor.reset(buffer);
-    }
-
-    @Override
-    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleLength(int tupleIndex) {
-        return frameAccessor.getTupleLength(tupleIndex);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
deleted file mode 100644
index d43f90d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public class FeedFrameTupleDecorator {
-
-    private AMutableString aString = new AMutableString("");
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AtomicInteger tupleId;
-
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-    @SuppressWarnings("unchecked")
-    private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-
-    private final int partition;
-    private final ArrayBackedValueStorage attrNameStorage;
-    private final ArrayBackedValueStorage attrValueStorage;
-
-    public FeedFrameTupleDecorator(int partition) {
-        this.tupleId = new AtomicInteger(0);
-        this.partition = partition;
-        this.attrNameStorage = new ArrayBackedValueStorage();
-        this.attrValueStorage = new ArrayBackedValueStorage();
-    }
-
-    public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
-            throws HyracksDataException, AsterixException {
-        attrNameStorage.reset();
-        aString.setValue(attrName);
-        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
-        attrValueStorage.reset();
-        aInt64.setValue(attrValue);
-        int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
-
-        recordBuilder.addField(attrNameStorage, attrValueStorage);
-    }
-
-    public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
-            throws HyracksDataException, AsterixException {
-        attrNameStorage.reset();
-        aString.setValue(attrName);
-        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
-
-        attrValueStorage.reset();
-        aInt32.setValue(attrValue);
-        int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
-
-        recordBuilder.addField(attrNameStorage, attrValueStorage);
-    }
-
-    public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
-    }
-
-    public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
-    }
-
-    public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
-    }
-
-    public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
-        addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index dcd03a9..3920a03 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -19,450 +19,369 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.dataflow.DataBucket.ContentType;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.MonitoredBuffer;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /**
+ * TODO: Add unit test cases for this class
  * Provides for error-handling and input-side buffering for a feed runtime.
- * The input handler is buffering in:
- * 1. FeedMetaComputeNodePushable.initializeNewFeedRuntime();
- * 2. FeedMetaStoreNodePushable.initializeNewFeedRuntime();
- *              ______
- *             |      |
- * ============| core |============
- * ============|  op  |============
- * ^^^^^^^^^^^^|______|
- * Input Side
- * Handler
+ * .............______.............
+ * ............|......|............
+ * ============|(core)|============
+ * ============|( op )|============
+ * ^^^^^^^^^^^^|______|............
+ * .Input Side.
+ * ..Handler...
  **/
-public class FeedRuntimeInputHandler implements IFrameWriter {
+public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor feedPolicyAccessor;
+    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
+    private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
     private final FeedExceptionHandler exceptionHandler;
-    private final FeedFrameDiscarder discarder;
-    private final FeedFrameSpiller spiller;
+    private final FrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
-    private final IFeedManager feedManager;
-    private final MonitoredBuffer mBuffer;
-    private final DataBucketPool pool;
-    private final FrameEventCallback frameEventCallback;
-
-    private boolean bufferingEnabled;
-    private FrameCollection frameCollection;
-    private Mode mode;
-    private Mode lastMode;
-    private boolean finished;
-    private long nProcessed;
-    private boolean throttlingEnabled;
-    protected IFrameWriter coreOperator;
+    private final FrameAction frameAction;
+    private final int initialFrameSize;
+    private final FrameTransporter consumer;
+    private final Thread consumerThread;
+    private final LinkedBlockingDeque<ByteBuffer> inbox;
+    private final ConcurrentFramePool memoryManager;
+    private Mode mode = Mode.PROCESS;
+    private int numDiscarded = 0;
+    private int numSpilled = 0;
+    private int numProcessedInMemory = 0;
+    private int numStalled = 0;
 
     public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
-            RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws HyracksDataException {
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.coreOperator = coreOperator;
-        this.bufferingEnabled = bufferingEnabled;
-        this.feedPolicyAccessor = fpa;
-        this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
-        this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
-        this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
-        this.mode = Mode.PROCESS;
-        this.lastMode = Mode.PROCESS;
-        this.finished = false;
+            IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager)
+            throws HyracksDataException {
+        this.writer = writer;
+        this.spiller =
+                new FrameSpiller(ctx,
+                        connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+                                + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
+                        fpa.getMaxSpillOnDisk());
+        this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
-        this.feedManager = feedManager;
-        this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
-                .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
-        this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
-                .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
-        this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
-        this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
-                feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
-                nPartitions, fpa);
-        this.throttlingEnabled = false;
+        this.memoryManager = feedMemoryManager;
+        this.inbox = new LinkedBlockingDeque<>();
+        this.consumer = new FrameTransporter();
+        this.consumerThread = new Thread();
+        this.consumerThread.start();
+        this.initialFrameSize = ctx.getInitialFrameSize();
+        this.frameAction = new FrameAction(inbox);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        synchronized (writer) {
+            writer.open();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        synchronized (writer) {
+            writer.fail();
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        inbox.add(POISON_PILL);
+        notify();
+        try {
+            consumerThread.join();
+        } catch (InterruptedException e) {
+            LOGGER.log(Level.WARNING, e.getMessage(), e);
+        }
+        try {
+            memoryManager.release(inbox);
+        } catch (Throwable th) {
+            LOGGER.log(Level.WARNING, th.getMessage(), th);
+        }
+        try {
+            spiller.close();
+        } catch (Throwable th) {
+            LOGGER.log(Level.WARNING, th.getMessage(), th);
+        }
+        writer.close();
     }
 
     @Override
-    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
         try {
+            if (consumer.cause() != null) {
+                throw consumer.cause();
+            }
             switch (mode) {
                 case PROCESS:
-                    switch (lastMode) {
-                        case SPILL:
-                        case POST_SPILL_DISCARD:
-                            setMode(Mode.PROCESS_SPILL);
-                            processSpilledBacklog();
-                            break;
-                        case STALL:
-                            setMode(Mode.PROCESS_BACKLOG);
-                            processBufferredBacklog();
-                            break;
-                        default:
-                            break;
-                    }
-                    process(frame);
-                    break;
-                case PROCESS_BACKLOG:
-                case PROCESS_SPILL:
                     process(frame);
                     break;
                 case SPILL:
                     spill(frame);
                     break;
                 case DISCARD:
-                case POST_SPILL_DISCARD:
                     discard(frame);
                     break;
-                case STALL:
-                    switch (runtimeId.getFeedRuntimeType()) {
-                        case COLLECT:
-                        case COMPUTE_COLLECT:
-                        case COMPUTE:
-                        case STORE:
-                            bufferDataUntilRecovery(frame);
-                            break;
-                        default:
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
-                            }
-                            break;
-                    }
-                    break;
-                case END:
-                case FAIL:
+                default:
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
                     }
                     break;
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
     }
 
-    private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
-        }
-        if (frameCollection == null) {
-            this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
-                    .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+    private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException {
+        int numFrames = frameSize / initialFrameSize;
+        if (numFrames == 1) {
+            return memoryManager.get();
+        } else {
+            return memoryManager.get(frameSize);
         }
-        if (frameCollection == null) {
-            discarder.processMessage(frame);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Running low on memory! DISCARDING FRAME ");
+    }
+
+    private void discard(ByteBuffer frame) throws HyracksDataException {
+        if (fpa.spillToDiskOnCongestion()) {
+            if (spiller.spill(frame)) {
+                numSpilled++;
+                mode = Mode.SPILL;
+                return;
             }
         } else {
-            boolean success = frameCollection.addFrame(frame);
-            if (!success) {
-                if (fpa.spillToDiskOnCongestion()) {
-                    if (frame != null) {
-                        spiller.processMessage(frame);
-                    } // TODO handle the else casec
-
-                } else {
-                    discarder.processMessage(frame);
-                }
+            ByteBuffer next = getFreeBuffer(frame.capacity());
+            if (next != null) {
+                numProcessedInMemory++;
+                next.put(frame);
+                inbox.offer(next);
+                mode = Mode.PROCESS;
+                return;
             }
         }
+        numDiscarded++;
     }
 
-    public void reportUnresolvableCongestion() throws HyracksDataException {
-        if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
-            FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
-                    mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
-            feedManager.getFeedMessageService().sendMessage(congestionReport);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
-            }
+    private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+        if (fpa.spillToDiskOnCongestion()) {
+            mode = Mode.SPILL;
+            spiller.open();
+            spill(frame);
         } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
-            }
+            discardOrStall(frame);
+        }
+    }
+
+    private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
+        if (fpa.discardOnCongestion()) {
+            numDiscarded++;
+            mode = Mode.DISCARD;
+            discard(frame);
+        } else {
+            stall(frame);
         }
     }
 
-    private void processBufferredBacklog() throws HyracksDataException {
+    private void stall(ByteBuffer frame) throws HyracksDataException {
         try {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Processing backlog " + this.runtimeId);
+            numStalled++;
+            // If spilling is enabled, we wait on the spiller
+            if (fpa.spillToDiskOnCongestion()) {
+                synchronized (spiller) {
+                    while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
+                        spiller.wait();
+                    }
+                }
+                spiller.spill(frame);
+                synchronized (this) {
+                    notify();
+                }
+                return;
             }
-
-            if (frameCollection != null) {
-                Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
-                while (backlog.hasNext()) {
-                    process(backlog.next());
-                    nProcessed++;
+            // Spilling is disabled, we subscribe to feedMemoryManager
+            frameAction.setFrame(frame);
+            synchronized (frameAction) {
+                if (memoryManager.subscribe(frameAction)) {
+                    frameAction.wait();
                 }
-                DataBucket bucket = pool.getDataBucket();
-                bucket.setContentType(ContentType.EOSD);
-                bucket.setDesiredReadCount(1);
-                mBuffer.sendMessage(bucket);
-                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
-                frameCollection = null;
             }
-        } catch (Exception e) {
-            e.printStackTrace();
+            synchronized (this) {
+                notify();
+            }
+        } catch (InterruptedException e) {
             throw new HyracksDataException(e);
         }
     }
 
-    private void processSpilledBacklog() throws HyracksDataException {
-        try {
-            Iterator<ByteBuffer> backlog = spiller.replayData();
-            while (backlog.hasNext()) {
-                process(backlog.next());
-                nProcessed++;
+    private void process(ByteBuffer frame) throws HyracksDataException {
+        // Get a page from
+        ByteBuffer next = getFreeBuffer(frame.capacity());
+        if (next != null) {
+            numProcessedInMemory++;
+            next.put(frame);
+            inbox.offer(next);
+            if (inbox.size() == 1) {
+                synchronized (this) {
+                    notify();
+                }
             }
-            DataBucket bucket = pool.getDataBucket();
-            bucket.setContentType(ContentType.EOSD);
-            bucket.setDesiredReadCount(1);
-            mBuffer.sendMessage(bucket);
-            spiller.reset();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
+        } else {
+            // out of memory. we switch to next mode as per policy -- synchronized method
+            exitProcessState(frame);
         }
     }
 
-    protected void process(ByteBuffer frame) throws HyracksDataException {
-        boolean frameProcessed = false;
-        while (!frameProcessed) {
-            try {
-                if (!bufferingEnabled) {
-                    if (frame == null) {
-                        setFinished(true);
-                        synchronized (coreOperator) {
-                            coreOperator.notifyAll();
-                        }
-                    } else {
-                        coreOperator.nextFrame(frame); // synchronous
-                    }
+    private void spill(ByteBuffer frame) throws HyracksDataException {
+        if (spiller.switchToMemory()) {
+            synchronized (this) {
+                // Check if there is memory
+                ByteBuffer next = getFreeBuffer(frame.capacity());
+                if (next != null) {
+                    spiller.close();
+                    numProcessedInMemory++;
+                    next.put(frame);
+                    inbox.offer(next);
+                    mode = Mode.PROCESS;
                 } else {
-                    DataBucket bucket = pool.getDataBucket();
-                    if (bucket != null) {
-                        if (frame != null) {
-                            bucket.reset(frame); // created a copy here
-                            bucket.setContentType(ContentType.DATA);
-                        } else {
-                            bucket.setContentType(ContentType.EOD);
-                            setFinished(true);
-                            synchronized (coreOperator) {
-                                coreOperator.notifyAll();
-                            }
-                        }
-                        // TODO: fix handling of eod case with monitored buffers.
-                        bucket.setDesiredReadCount(1);
-                        mBuffer.sendMessage(bucket);
-                        mBuffer.sendReport(frame);
-                        nProcessed++;
-                    } else {
-                        if (fpa.spillToDiskOnCongestion()) {
-                            if (frame != null) {
-                                boolean spilled = spiller.processMessage(frame);
-                                if (spilled) {
-                                    setMode(Mode.SPILL);
-                                } else {
-                                    reportUnresolvableCongestion();
-                                }
-                            }
-                        } else if (fpa.discardOnCongestion()) {
-                            boolean discarded = discarder.processMessage(frame);
-                            if (!discarded) {
-                                reportUnresolvableCongestion();
-                            }
-                        } else if (fpa.throttlingEnabled()) {
-                            setThrottlingEnabled(true);
-                        } else {
-                            reportUnresolvableCongestion();
-                        }
-
-                    }
+                    // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0)
+                    spiller.spill(frame);
+                    numSpilled++;
+                    notify();
                 }
-                frameProcessed = true;
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (feedPolicyAccessor.continueOnSoftFailure()) {
-                    frame = exceptionHandler.handleException(e, frame);
-                    if (frame == null) {
-                        frameProcessed = true;
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Encountered exception! " + e.getMessage()
-                                    + "Insufficient information, Cannot extract failing tuple");
-                        }
-                    }
+            }
+        } else {
+            // try to spill. If failed switch to either discard or stall
+            if (spiller.spill(frame)) {
+                numSpilled++;
+            } else {
+                if (fpa.discardOnCongestion()) {
+                    discard(frame);
                 } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
-                    }
-                    mBuffer.close(false);
-                    throw new HyracksDataException(e);
+                    stall(frame);
                 }
             }
         }
     }
 
-    private void spill(ByteBuffer frame) throws Exception {
-        boolean success = spiller.processMessage(frame);
-        if (!success) {
-            // limit reached
-            setMode(Mode.POST_SPILL_DISCARD);
-            reportUnresolvableCongestion();
-        }
-    }
-
-    private void discard(ByteBuffer frame) throws Exception {
-        boolean success = discarder.processMessage(frame);
-        if (!success) { // limit reached
-            reportUnresolvableCongestion();
-        }
-    }
-
     public Mode getMode() {
         return mode;
     }
 
     public synchronized void setMode(Mode mode) {
-        if (mode.equals(this.mode)) {
-            return;
-        }
-        this.lastMode = this.mode;
         this.mode = mode;
-        if (mode.equals(Mode.END)) {
-            this.close();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
-        }
     }
 
     @Override
-    public void close() {
-        boolean disableMonitoring = !this.mode.equals(Mode.STALL);
-        if (frameCollection != null) {
-            feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
-        }
-        if (pool != null) {
-            feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
-        }
-        mBuffer.close(false, disableMonitoring);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring " + disableMonitoring
-                    + " Mode for runtime " + this.mode);
+    public void flush() throws HyracksDataException {
+        synchronized (writer) {
+            writer.flush();
         }
     }
 
-    public IFrameWriter getCoreOperator() {
-        return coreOperator;
-    }
-
-    public void setCoreOperator(IFrameWriter coreOperator) {
-        this.coreOperator = coreOperator;
-        mBuffer.setFrameWriter(coreOperator);
-        frameEventCallback.setCoreOperator(coreOperator);
-    }
-
-    public boolean isFinished() {
-        return finished;
+    public int getNumDiscarded() {
+        return numDiscarded;
     }
 
-    public void setFinished(boolean finished) {
-        this.finished = finished;
+    public int getNumSpilled() {
+        return numSpilled;
     }
 
-    public long getProcessed() {
-        return nProcessed;
+    public int getNumProcessedInMemory() {
+        return numProcessedInMemory;
     }
 
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
+    public int getNumStalled() {
+        return numStalled;
     }
 
-    @Override
-    public void open() throws HyracksDataException {
-        coreOperator.open();
-        mBuffer.start();
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        coreOperator.fail();
-    }
+    private class FrameTransporter implements Runnable {
+        private volatile Throwable cause;
 
-    public void reset(int nPartitions) {
-        this.mBuffer.setNumberOfPartitions(nPartitions);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
+        public Throwable cause() {
+            return cause;
         }
-        mBuffer.reset();
-    }
 
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public IFeedManager getFeedManager() {
-        return feedManager;
-    }
-
-    public MonitoredBuffer getmBuffer() {
-        return mBuffer;
-    }
-
-    public boolean isThrottlingEnabled() {
-        return throttlingEnabled;
-    }
-
-    public void setThrottlingEnabled(boolean throttlingEnabled) {
-        if (this.throttlingEnabled != throttlingEnabled) {
-            this.throttlingEnabled = throttlingEnabled;
-            IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
-            feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
+        private Throwable consume(ByteBuffer frame) {
+            while (frame != null) {
+                try {
+                    writer.nextFrame(frame);
+                    frame = null;
+                } catch (HyracksDataException e) {
+                    // It is fine to catch throwable here since this thread is always expected to terminate gracefully
+                    frame = exceptionHandler.handle(e, frame);
+                    if (frame == null) {
+                        this.cause = e;
+                        return e;
+                    }
+                } catch (Throwable th) {
+                    this.cause = th;
+                    return th;
+                }
             }
+            return null;
         }
-    }
-
-    public boolean isBufferingEnabled() {
-        return bufferingEnabled;
-    }
-
-    public void setBufferingEnabled(boolean bufferingEnabled) {
-        this.bufferingEnabled = bufferingEnabled;
-    }
 
-    @Override
-    public void flush() throws HyracksDataException {
-        // Only flush when in process mode.
-        if (mode == Mode.PROCESS) {
-            coreOperator.flush();
+        @Override
+        public void run() {
+            try {
+                ByteBuffer frame = inbox.poll();
+                while (frame != POISON_PILL) {
+                    if (frame != null) {
+                        try {
+                            if (consume(frame) != null) {
+                                return;
+                            }
+                        } finally {
+                            // Done with frame.
+                            memoryManager.release(frame);
+                        }
+                    }
+                    frame = inbox.poll();
+                    if (frame == null) {
+                        // Memory queue is empty. Check spill
+                        frame = spiller.next();
+                        while (frame != null) {
+                            if (consume(frame) != null) {
+                                // We don't release the frame since this is a spill frame that we didn't get from memory
+                                // manager
+                                return;
+                            }
+                            frame = spiller.next();
+                        }
+                        writer.flush();
+                        // At this point. We consumed all memory and spilled
+                        // We can't assume the next will be in memory. what if there is 0 memory?
+                        synchronized (FeedRuntimeInputHandler.this) {
+                            frame = inbox.poll();
+                            if (frame == null) {
+                                // Nothing in memory
+                                if (spiller.switchToMemory()) {
+                                    // Nothing in disk
+                                    FeedRuntimeInputHandler.this.wait();
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+            // cleanup will always be done through the close() call
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
new file mode 100644
index 0000000..4a2120a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import rx.functions.Action1;
+
+public class FrameAction implements Action1<ByteBuffer> {
+    private final LinkedBlockingDeque<ByteBuffer> inbox;
+    private ByteBuffer frame;
+
+    public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void call(ByteBuffer freeFrame) {
+        freeFrame.put(frame);
+        inbox.add(freeFrame);
+        synchronized (this) {
+            notify();
+        }
+    }
+
+    public ByteBuffer getFrame() {
+        return frame;
+    }
+
+    public void setFrame(ByteBuffer frame) {
+        this.frame = frame;
+    }
+
+    public int getSize() {
+        return frame.capacity();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
deleted file mode 100644
index 7980712..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-
-/**
- * Represents an expandable collection of frames.
- */
-public class FrameCollection implements IFeedMemoryComponent {
-
-    /** A unique identifier for the feed memory component **/
-    private final int componentId;
-
-    /** A collection of frames (each being a ByteBuffer) **/
-    private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
-
-    /** The permitted maximum size, the collection may grow to **/
-    private int maxSize;
-
-    /** The {@link IFeedMemoryManager} for the NodeController **/
-    private final IFeedMemoryManager memoryManager;
-
-    public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
-        this.componentId = componentId;
-        this.maxSize = maxSize;
-        this.memoryManager = memoryManager;
-    }
-
-    public boolean addFrame(ByteBuffer frame) {
-        if (frames.size() == maxSize) {
-            boolean expansionGranted = memoryManager.expandMemoryComponent(this);
-            if (!expansionGranted) {
-                return false;
-            }
-        }
-        ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
-        storageBuffer.put(frame);
-        frames.add(storageBuffer);
-        storageBuffer.flip();
-        return true;
-    }
-
-    public Iterator<ByteBuffer> getFrameCollectionIterator() {
-        return frames.iterator();
-    }
-
-    @Override
-    public int getTotalAllocation() {
-        return frames.size();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.COLLECTION;
-    }
-
-    @Override
-    public int getComponentId() {
-        return componentId;
-    }
-
-    @Override
-    public void expand(int delta) {
-        maxSize = maxSize + delta;
-    }
-
-    @Override
-    public void reset() {
-        frames.clear();
-        maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
-    }
-
-    @Override
-    public String toString() {
-        return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
index 802a791..c8b7406 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -21,360 +21,162 @@ package org.apache.asterix.external.feed.dataflow;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.log4j.Logger;
 
-public class FrameDistributor {
+public class FrameDistributor implements IFrameWriter {
 
-    private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
-
-    private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
-
-    private final FeedId feedId;
-    private final FeedRuntimeType feedRuntimeType;
-    private final int partition;
-    private final IFeedMemoryManager memoryManager;
-    private final boolean enableSynchronousTransfer;
+    public static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
     /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
-    private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
-    private final FrameTupleAccessor fta;
-
-    private DataBucketPool pool;
-    private DistributionMode distributionMode;
-    private boolean spillToDiskRequired = false;
-
-    public enum DistributionMode {
-        /**
-         * A single feed frame collector is registered for receiving tuples.
-         * Tuple is sent via synchronous call, that is no buffering is involved
-         **/
-        SINGLE,
-
-        /**
-         * Multiple feed frame collectors are concurrently registered for
-         * receiving tuples.
-         **/
-        SHARED,
+    private final Map<FeedConnectionId, FeedFrameCollector> registeredCollectors;
+    private Throwable rootFailureCause = null;
 
-        /**
-         * Feed tuples are not being processed, irrespective of # of registered
-         * feed frame collectors.
-         **/
-        INACTIVE
+    public FrameDistributor() throws HyracksDataException {
+        this.registeredCollectors = new HashMap<FeedConnectionId, FeedFrameCollector>();
     }
 
-    public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
-            boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
-            throws HyracksDataException {
-        this.feedId = feedId;
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.memoryManager = memoryManager;
-        this.enableSynchronousTransfer = enableSynchronousTransfer;
-        this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
-        this.distributionMode = DistributionMode.INACTIVE;
-        this.fta = fta;
-    }
-
-    public void notifyEndOfFeed() throws InterruptedException {
-        DataBucket bucket = getDataBucket();
-        if (bucket != null) {
-            sendEndOfFeedDataBucket(bucket);
-        } else {
-            while (bucket == null) {
+    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException {
+        if (rootFailureCause != null) {
+            throw new HyracksDataException("attempt to register to a failed feed data provider", rootFailureCause);
+        }
+        // registering a new collector.
+        try {
+            frameCollector.open();
+        } catch (Throwable th) {
+            rootFailureCause = th;
+            try {
+                frameCollector.fail();
+            } catch (Throwable failThrowable) {
+                th.addSuppressed(failThrowable);
+            } finally {
                 try {
-                    Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
-                    bucket = getDataBucket();
-                } catch (InterruptedException e) {
-                    break;
+                    frameCollector.close();
+                } catch (Throwable closeThrowable) {
+                    th.addSuppressed(closeThrowable);
                 }
             }
-            if (bucket != null) {
-                sendEndOfFeedDataBucket(bucket);
-            }
+            throw th;
         }
+        registeredCollectors.put(frameCollector.getConnectionId(), frameCollector);
     }
 
-    private void sendEndOfFeedDataBucket(DataBucket bucket) throws InterruptedException {
-        bucket.setContentType(DataBucket.ContentType.EOD);
-        nextBucket(bucket);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("End of feed data packet sent " + this.feedId);
-        }
-    }
-
-    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
-        DistributionMode currentMode = distributionMode;
-        switch (distributionMode) {
-            case INACTIVE:
-                if (!enableSynchronousTransfer) {
-                    pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
-                    frameCollector.start();
-                }
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                setMode(DistributionMode.SINGLE);
-                break;
-            case SINGLE:
-                pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                for (FeedFrameCollector reader : registeredCollectors.values()) {
-                    reader.start();
-                }
-                setMode(DistributionMode.SHARED);
-                break;
-            case SHARED:
-                frameCollector.start();
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                break;
-        }
-        evaluateIfSpillIsEnabled();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(
-                    "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
-        }
+    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException {
+        deregisterFrameCollector(frameCollector.getConnectionId());
     }
 
-    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
-        switch (distributionMode) {
-            case INACTIVE:
-                throw new IllegalStateException(
-                        "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
-            case SHARED:
-                frameCollector.closeCollector();
-                registeredCollectors.remove(frameCollector.getFrameWriter());
-                int nCollectors = registeredCollectors.size();
-                if (nCollectors == 1) {
-                    FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
-                    setMode(DistributionMode.SINGLE);
-                    loneCollector.setState(FeedFrameCollector.State.TRANSITION);
-                    loneCollector.closeCollector();
-                    memoryManager.releaseMemoryComponent(pool);
-                    evaluateIfSpillIsEnabled();
-                } else {
-                    if (!spillToDiskRequired) {
-                        evaluateIfSpillIsEnabled();
-                    }
-                }
-                break;
-            case SINGLE:
-                frameCollector.closeCollector();
-                setMode(DistributionMode.INACTIVE);
-                spillToDiskRequired = false;
-                break;
-
+    public synchronized void deregisterFrameCollector(FeedConnectionId connectionId) throws HyracksDataException {
+        if (rootFailureCause != null) {
+            throw new HyracksDataException("attempt to register to a failed feed data provider", rootFailureCause);
         }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
-        }
-    }
-
-    public void evaluateIfSpillIsEnabled() {
-        if (!spillToDiskRequired) {
-            for (FeedFrameCollector collector : registeredCollectors.values()) {
-                spillToDiskRequired = spillToDiskRequired
-                        || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
-                if (spillToDiskRequired) {
-                    break;
-                }
-            }
-        }
-    }
-
-    public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
-        FeedFrameCollector collector = registeredCollectors.get(frameWriter);
-        if (collector != null) {
-            deregisterFrameCollector(collector);
-            return true;
+        FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
+        try {
+            frameCollector.close();
+        } catch (Throwable th) {
+            rootFailureCause = th;
+            throw th;
         }
-        return false;
     }
 
-    public synchronized void setMode(DistributionMode mode) {
-        this.distributionMode = mode;
-    }
-
-    public boolean isRegistered(IFrameWriter writer) {
-        return registeredCollectors.get(writer) != null;
+    public synchronized FeedFrameCollector removeFrameCollector(FeedConnectionId connectionId) {
+        return registeredCollectors.remove(connectionId);
     }
 
+    /*
+     * Fix. What should be done?:
+     * 0. mark failure so no one can subscribe or unsubscribe.
+     * 1. Throw the throwable.
+     * 2. when fail() is called, call fail on all subscribers
+     * 3. close all the subscribers.
+     * (non-Javadoc)
+     * @see org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
+     */
+    @Override
     public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        try {
-            switch (distributionMode) {
-                case INACTIVE:
-                    break;
-                case SINGLE:
-                    FeedFrameCollector collector = registeredCollectors.values().iterator().next();
-                    switch (collector.getState()) {
-                        case HANDOVER:
-                        case ACTIVE:
-                            if (enableSynchronousTransfer) {
-                                collector.nextFrame(frame); // processing is synchronous
-                            } else {
-                                handleDataBucket(frame);
-                            }
-                            break;
-                        case TRANSITION:
-                            handleDataBucket(frame);
-                            break;
-                        case FINISHED:
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Discarding fetched tuples, feed has ended ["
-                                        + registeredCollectors.get(0) + "]" + " Feed Id " + feedId
-                                        + " frame distributor " + this.getFeedRuntimeType());
-                            }
-                            registeredCollectors.remove(0);
-                            break;
-                    }
-                    break;
-                case SHARED:
-                    handleDataBucket(frame);
-                    break;
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+        if (rootFailureCause != null) {
+            throw new HyracksDataException(rootFailureCause);
         }
-    }
-
-    private void nextBucket(DataBucket bucket) throws InterruptedException {
         for (FeedFrameCollector collector : registeredCollectors.values()) {
-            collector.sendMessage(bucket); // asynchronous call
-        }
-    }
-
-    private void handleDataBucket(ByteBuffer frame) throws HyracksDataException, InterruptedException {
-        DataBucket bucket = getDataBucket();
-        if (bucket == null) {
-            handleFrameDuringMemoryCongestion(frame);
-        } else {
-            bucket.reset(frame);
-            bucket.setDesiredReadCount(registeredCollectors.size());
-            nextBucket(bucket);
-        }
-    }
-
-    private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
-        }
-        // wait till memory is available
-    }
-
-    private DataBucket getDataBucket() {
-        DataBucket bucket = null;
-        if (pool != null) {
-            bucket = pool.getDataBucket();
-            if (bucket != null) {
-                bucket.setDesiredReadCount(registeredCollectors.size());
-                return bucket;
-            } else {
-                return null;
+            try {
+                collector.nextFrame(frame);
+            } catch (Throwable th) {
+                rootFailureCause = th;
+                throw th;
             }
         }
-        return null;
-    }
-
-    public DistributionMode getMode() {
-        return distributionMode;
     }
 
-    public void close() throws HyracksDataException {
-        try {
-            switch (distributionMode) {
-                case INACTIVE:
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("FrameDistributor is " + distributionMode);
+    @Override
+    public void fail() throws HyracksDataException {
+        Collection<FeedFrameCollector> collectors = registeredCollectors.values();
+        Iterator<FeedFrameCollector> it = collectors.iterator();
+        while (it.hasNext()) {
+            FeedFrameCollector collector = it.next();
+            try {
+                collector.fail();
+            } catch (Throwable th) {
+                while (it.hasNext()) {
+                    FeedFrameCollector innerCollector = it.next();
+                    try {
+                        innerCollector.fail();
+                    } catch (Throwable innerTh) {
+                        th.addSuppressed(innerTh);
                     }
-                    break;
-                case SINGLE:
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode "
-                                + " for  feedId " + feedId + " " + this.feedRuntimeType);
-                    }
-                    setMode(DistributionMode.INACTIVE);
-                    if (!enableSynchronousTransfer) {
-                        notifyEndOfFeed(); // send EOD Data Bucket
-                        waitForCollectorsToFinish();
-                    }
-                    registeredCollectors.values().iterator().next().disconnect();
-                    break;
-                case SHARED:
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
-                    }
-                    notifyEndOfFeed(); // send EOD Data Bucket
-                    waitForCollectorsToFinish();
-                    break;
+                }
+                throw th;
             }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
         }
     }
 
-    private void waitForCollectorsToFinish() {
-        synchronized (registeredCollectors.values()) {
-            while (!allCollectorsFinished()) {
-                try {
-                    registeredCollectors.values().wait();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
+    @Override
+    public void close() throws HyracksDataException {
+        Collection<FeedFrameCollector> collectors = registeredCollectors.values();
+        Iterator<FeedFrameCollector> it = collectors.iterator();
+        while (it.hasNext()) {
+            FeedFrameCollector collector = it.next();
+            try {
+                collector.close();
+            } catch (Throwable th) {
+                while (it.hasNext()) {
+                    FeedFrameCollector innerCollector = it.next();
+                    try {
+                        innerCollector.close();
+                    } catch (Throwable innerTh) {
+                        th.addSuppressed(innerTh);
+                    } finally {
+                        innerCollector.setState(State.FINISHED);
+                    }
                 }
+                // resume here
+                throw th;
+            } finally {
+                collector.setState(State.FINISHED);
             }
         }
     }
 
-    private boolean allCollectorsFinished() {
-        boolean allFinished = true;
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
-        }
-        return allFinished;
-    }
-
-    public Collection<FeedFrameCollector> getRegisteredCollectors() {
-        return registeredCollectors.values();
-    }
-
-    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
-        return registeredCollectors;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public DistributionMode getDistributionMode() {
-        return distributionMode;
-    }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return feedRuntimeType;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public FrameTupleAccessor getFta() {
-        return fta;
-    }
-
+    @Override
     public void flush() throws HyracksDataException {
-        switch (distributionMode) {
-            case SINGLE:
-                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+        if (rootFailureCause != null) {
+            throw new HyracksDataException(rootFailureCause);
+        }
+        for (FeedFrameCollector collector : registeredCollectors.values()) {
+            try {
                 collector.flush();
-            default:
-                break;
+            } catch (Throwable th) {
+                rootFailureCause = th;
+                throw th;
+            }
         }
     }
 
+    @Override
+    public void open() throws HyracksDataException {
+        // Nothing to do here :)
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
deleted file mode 100644
index f1499fb..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FrameEventCallback implements IFrameEventCallback {
-
-    private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
-
-    private final FeedPolicyAccessor fpa;
-    private final FeedRuntimeInputHandler inputSideHandler;
-    private IFrameWriter coreOperator;
-
-    public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
-            IFrameWriter coreOperator) {
-        this.fpa = fpa;
-        this.inputSideHandler = inputSideHandler;
-        this.coreOperator = coreOperator;
-    }
-
-    @Override
-    public void frameEvent(FrameEvent event) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
-        }
-        if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
-                && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
-            return;
-        }
-        switch (event) {
-            case PENDING_WORK_THRESHOLD_REACHED:
-                if (fpa.spillToDiskOnCongestion()) {
-                    inputSideHandler.setMode(Mode.SPILL);
-                } else if (fpa.discardOnCongestion()) {
-                    inputSideHandler.setMode(Mode.DISCARD);
-                } else if (fpa.throttlingEnabled()) {
-                    inputSideHandler.setThrottlingEnabled(true);
-                } else {
-                    try {
-                        inputSideHandler.reportUnresolvableCongestion();
-                    } catch (HyracksDataException e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to report congestion!!!");
-                        }
-                    }
-                }
-                break;
-            case FINISHED_PROCESSING:
-                inputSideHandler.setFinished(true);
-                synchronized (coreOperator) {
-                    coreOperator.notifyAll();
-                }
-                break;
-            case PENDING_WORK_DONE:
-                switch (inputSideHandler.getMode()) {
-                    case SPILL:
-                    case DISCARD:
-                    case POST_SPILL_DISCARD:
-                        inputSideHandler.setMode(Mode.PROCESS);
-                        break;
-                    default:
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
-                        }
-                }
-                break;
-            case FINISHED_PROCESSING_SPILLAGE:
-                inputSideHandler.setMode(Mode.PROCESS);
-                break;
-            default:
-                break;
-        }
-    }
-
-    public void setCoreOperator(IFrameWriter coreOperator) {
-        this.coreOperator = coreOperator;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
new file mode 100644
index 0000000..f0d226a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * A {@link FrameSpiller} is used with feeds when "spill.to.disk.on.congestion" is set to true. The spiller spills
+ * excess tuples to disk if an operator
+ * cannot process incoming data at its arrival rate. The maximum size of data (tuples) that can be spilled to disk is
+ * configured using the property
+ * "max.spill.size.on.disk"
+ */
+public class FrameSpiller {
+    private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName());
+    private static final int FRAMES_PER_FILE = 1024;
+
+    private final String fileNamePrefix;
+    private final ArrayDeque<File> files = new ArrayDeque<>();
+    private final VSizeFrame frame;
+    private final int budget;           // Max current frames in disk allowed
+    private BufferedOutputStream bos;   // Current output stream
+    private BufferedInputStream bis;    // Current input stream
+    private File currentWriteFile;      // Current write file
+    private File currentReadFile;       // Current read file
+    private int currentWriteCount = 0;  // Current file write count
+    private int currentReadCount = 0;   // Current file read count
+    private int totalWriteCount = 0;    // Total frames spilled
+    private int totalReadCount = 0;     // Total frames read
+    private int fileCount = 0;          // How many spill files?
+
+    public FrameSpiller(IHyracksTaskContext ctx, String fileNamePrefix, long budgetInBytes)
+            throws HyracksDataException {
+        this.frame = new VSizeFrame(ctx);
+        this.fileNamePrefix = fileNamePrefix;
+        this.budget = (int) (budgetInBytes / ctx.getInitialFrameSize());
+
+    }
+
+    public void open() throws HyracksDataException {
+        try {
+            this.currentWriteFile = createFile();
+            this.currentReadFile = currentWriteFile;
+            this.bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
+            this.bis = new BufferedInputStream(new FileInputStream(currentReadFile));
+        } catch (Exception e) {
+            LOGGER.fatal("Unable to create spill file", e);
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public boolean switchToMemory() {
+        return totalWriteCount == totalReadCount;
+    }
+
+    public int remaining() {
+        return totalWriteCount - totalReadCount;
+    }
+
+    public synchronized ByteBuffer next() throws HyracksDataException {
+        try {
+            frame.reset();
+            if (totalReadCount == totalWriteCount) {
+                return null;
+            }
+            if (currentReadFile == null) {
+                if (!files.isEmpty()) {
+                    currentReadFile = files.pop();
+                    bis = new BufferedInputStream(new FileInputStream(currentReadFile));
+                } else {
+                    return null;
+                }
+            }
+            // read first frame
+            bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
+            byte frameCount = frame.getBuffer().array()[0];
+            if (frameCount > 1) {
+                // expand the frame keeping existing data
+                frame.ensureFrameSize(frame.getMinSize() * frameCount);
+                bis.read(frame.getBuffer().array(), frame.getMinSize(), frame.getFrameSize() - frame.getMinSize());
+            }
+            currentReadCount++;
+            totalReadCount++;
+            if (currentReadCount >= FRAMES_PER_FILE) {
+                currentReadCount = 0;
+                // done with the file
+                bis.close();
+                Files.delete(currentReadFile.toPath());
+                if (!files.isEmpty()) {
+                    currentReadFile = files.pop();
+                    bis = new BufferedInputStream(new FileInputStream(currentReadFile));
+                } else {
+                    currentReadFile = null;
+                }
+            }
+            return frame.getBuffer();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public double usedBudget() {
+        return ((double) (totalWriteCount - totalReadCount) / (double) budget);
+    }
+
+    public synchronized boolean spill(ByteBuffer frame) throws HyracksDataException {
+        try {
+            if (totalWriteCount - totalReadCount >= budget) {
+                return false;
+            }
+            currentWriteCount++;
+            totalWriteCount++;
+            bos.write(frame.array());
+            bos.flush();
+            if (currentWriteCount >= FRAMES_PER_FILE) {
+                bos.close();
+                currentWriteCount = 0;
+                currentWriteFile = createFile();
+                files.add(currentWriteFile);
+                bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
+            }
+            return true;
+        } catch (IOException e) {
+            close();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private File createFile() throws HyracksDataException {
+        try {
+            String fileName = fileNamePrefix + "_" + fileCount++;
+            File file = new File(fileName);
+            if (!file.exists()) {
+                boolean success = file.createNewFile();
+                if (!success) {
+                    throw new HyracksDataException("Unable to create spill file " + fileName);
+                } else {
+                    if (LOGGER.isEnabledFor(Level.INFO)) {
+                        LOGGER.info("Created spill file " + file.getAbsolutePath());
+                    }
+                }
+            }
+            return file;
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
+        }
+    }
+
+    public synchronized void close() {
+        // Do proper cleanup
+        if (bos != null) {
+            try {
+                bos.flush();
+                bos.close();
+            } catch (IOException e) {
+                LOGGER.warn(e.getMessage(), e);
+            }
+        }
+        if (bis != null) {
+            try {
+                bis.close();
+            } catch (IOException e) {
+                LOGGER.warn(e.getMessage(), e);
+            }
+        }
+        if (currentReadFile != null) {
+            try {
+                Files.deleteIfExists(currentReadFile.toPath());
+            } catch (Exception e) {
+                LOGGER.warn(e.getMessage(), e);
+            }
+            currentReadFile = null;
+        }
+        while (!files.isEmpty()) {
+            File file = files.pop();
+            try {
+                Files.deleteIfExists(file.toPath());
+            } catch (Exception e) {
+                LOGGER.warn(e.getMessage(), e);
+            }
+        }
+        currentWriteCount = 0;
+        currentReadCount = 0;
+        totalWriteCount = 0;
+        totalReadCount = 0;
+    }
+}


Mime
View raw message