asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
Date Sun, 15 May 2016 19:03:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index cd20900..04ef016 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,30 +18,18 @@
  */
 package org.apache.asterix.external.operators;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IAdapterRuntimeManager;
-import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
-import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -55,176 +43,78 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
  */
 public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
-    private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
-
     private final FeedId feedId;
     private final int partition;
-    private final IFeedSubscriptionManager feedSubscriptionManager;
-    private final IFeedManager feedManager;
     private final IHyracksTaskContext ctx;
     private final IAdapterFactory adapterFactory;
-
-    private IngestionRuntime ingestionRuntime;
-    private FeedAdapter adapter;
-    private IIntakeProgressTracker tracker;
-    private DistributeFeedFrameWriter feedFrameWriter;
-
     private final FeedIntakeOperatorDescriptor opDesc;
 
-    private final IRecordDescriptorProvider recordDescProvider;
-
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
-            int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor,
-            IRecordDescriptorProvider recordDescProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
+            int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
+            FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
         this.opDesc = feedIntakeOperatorDescriptor;
-        this.recordDescProvider = recordDescProvider;
+        this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
         this.ctx = ctx;
         this.feedId = feedId;
         this.partition = partition;
-        this.ingestionRuntime = ingestionRuntime;
         this.adapterFactory = adapterFactory;
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
-        this.feedSubscriptionManager = feedManager.getFeedSubscriptionManager();
     }
 
     @Override
     public void initialize() throws HyracksDataException {
-        IAdapterRuntimeManager adapterRuntimeManager = null;
+        FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getFeedManager();
+        AdapterRuntimeManager adapterRuntimeManager = null;
+        DistributeFeedFrameWriter frameDistributor = null;
+        IngestionRuntime ingestionRuntime = null;
+        boolean open = false;
         try {
-            if (ingestionRuntime == null) {
-                try {
-                    adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
-                    //TODO: Fix record tracking
-                    //                    if (adapterFactory.isRecordTrackingEnabled()) {
-                    //                        tracker = adapterFactory.createIntakeProgressTracker();
-                    //                    }
-                } catch (Exception e) {
-                    LOGGER.severe("Unable to create adapter : " + adapterFactory.getAlias() + "[" + partition + "]"
-                            + " Exception " + e);
-                    throw new HyracksDataException(e);
-                }
-
-                recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
-                FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
-                feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
-                        fta, feedManager);
-                adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
-                SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
-                        partition);
-                ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
-                        adapterRuntimeManager, ctx);
-                feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
-                // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-                ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
-                        null);
-                feedFrameWriter.open();
-            } else {
-                if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
-                    ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
-                    adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
-                                + ingestionRuntime);
-                        LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
-                                + " connected to backend for feed " + feedId);
-                    }
-                    feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
-                } else {
-                    String message = "Feed Ingestion Runtime for feed " + feedId
-                            + " is already registered and is active!.";
-                    LOGGER.severe(message);
-                    throw new IllegalStateException(message);
+            Thread.currentThread().setName("Intake Thread");
+            // create the adapter
+            FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
+            // create the distributor
+            frameDistributor = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
+                    new FrameTupleAccessor(recordDesc));
+            // create adapter runtime manager
+            adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
+            // create and register the runtime
+            FeedRuntimeId runtimeId =
+                    new FeedRuntimeId(feedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+            ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);
+            feedManager.registerFeedSubscribableRuntime(ingestionRuntime);
+            // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
+            ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
+                    null);
+            // open the distributor
+            open = true;
+            frameDistributor.open();
+            // wait until ingestion is over
+            synchronized (adapterRuntimeManager) {
+                while (!adapterRuntimeManager.isDone()) {
+                    adapterRuntimeManager.wait();
                 }
             }
-
-            waitTillIngestionIsOver(adapterRuntimeManager);
-            feedSubscriptionManager
-                    .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
-            if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
+            // The ingestion is over. we need to remove the runtime from the manager
+            feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
+            // If there was a failure, we need to throw an exception
+            if (adapterRuntimeManager.isFailed()) {
                 throw new HyracksDataException("Unable to ingest data");
             }
-
-        } catch (InterruptedException ie) {
+        } catch (Throwable ie) {
             /*
              * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
              * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
              * The surviving intake partitions must continue to live and receive data from the external source.
              */
-            List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
-            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
-            boolean needToHandleFailure = false;
-            List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
-            for (ISubscriberRuntime subscriber : subscribers) {
-                policyAccessor.reset(subscriber.getFeedPolicy());
-                if (!policyAccessor.continueOnHardwareFailure()) {
-                    failingSubscribers.add(subscriber);
-                } else {
-                    needToHandleFailure = true;
-                }
+            if (ingestionRuntime != null) {
+                ingestionRuntime.terminate();
+                feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
             }
-
-            for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
-                try {
-                    ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning(
-                                "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
-                    }
-                }
-            }
-
-            if (needToHandleFailure) {
-                ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(
-                            "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
-                }
-                feedSubscriptionManager
-                        .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
-                throw new HyracksDataException(ie);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(ie);
         } finally {
-            if ((ingestionRuntime != null)
-                    && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
-                feedFrameWriter.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
-                            + ingestionRuntime.getAdapterRuntimeManager().getState());
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
-                            + " Will resume after correcting failure");
-                }
+            if (open) {
+                frameDistributor.close();
             }
-
         }
     }
-
-    private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
-        }
-        synchronized (adapterRuntimeManager) {
-            while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
-                    || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
-                adapterRuntimeManager.wait();
-            }
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
-                    + " done with ingestion of feed " + feedId);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index a92544b..82bf1da 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -18,42 +18,21 @@
  */
 package org.apache.asterix.external.operators;
 
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.api.IAdapterRuntimeManager;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedMessage;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedRuntimeManager;
+import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.external.feed.watch.StorageSideMonitoredBuffer;
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -71,7 +50,7 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
 
     private final FeedConnectionId connectionId;
     private final IFeedMessage message;
-    private final IFeedManager feedManager;
+    private final FeedManager feedManager;
     private final int partition;
 
     public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
@@ -79,9 +58,9 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         this.connectionId = connectionId;
         this.message = feedMessage;
         this.partition = partition;
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+        IAsterixAppRuntimeContext runtimeCtx =
+                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        this.feedManager = (FeedManager) runtimeCtx.getFeedManager();
     }
 
     @Override
@@ -100,28 +79,9 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                             break;
                     }
                     break;
-                case PREPARE_STALL: {
-                    handlePrepareStallMessage((PrepareStallMessage) message);
-                    break;
-                }
-                case TERMINATE_FLOW: {
-                    FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
-                    handleTerminateFlowMessage(connectionId);
-                    break;
-                }
-                case COMMIT_ACK_RESPONSE: {
-                    handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
-                    break;
-                }
-                case THROTTLING_ENABLED: {
-                    handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
-                    break;
-                }
                 default:
                     break;
-
             }
-
         } catch (Exception e) {
             throw new HyracksDataException(e);
         } finally {
@@ -129,104 +89,12 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         }
     }
 
-    private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
-        FeedConnectionId connectionId = throttlingMessage.getConnectionId();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : runtimes) {
-            if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
-                FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
-                ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
-                            + connectionId);
-                }
-            }
-        }
-    }
-
-    private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
-        FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : runtimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            switch (runtimeId.getFeedRuntimeType()) {
-                case COLLECT:
-                    FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
-                            .getInputHandler();
-                    int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
-                    inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
-                    break;
-                case STORE:
-                    MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
-                            .getStorageTimeTrackingRateTask();
-                    sTask.receiveCommitAckResponse(commitResponseMessage);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
-                partition);
-        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
-                .getSubscribableRuntime(sid);
-        if (ingestionRuntime != null) {
-            IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
-            if (tracker != null) {
-                tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
-            }
-        }
-    }
-
-    private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
-
-        boolean found = false;
-        for (FeedRuntimeId runtimeId : feedRuntimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
-                ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
-                found = true;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Switched " + runtime + " to Hand Over stage");
-                }
-            }
-        }
-        if (!found) {
-            throw new HyracksDataException("COLLECT Runtime  not found!");
-        }
-    }
-
-    private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
-        FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
-        int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : feedRuntimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            switch (runtimeId.getFeedRuntimeType()) {
-                case COMPUTE:
-                    Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
-                            : Mode.END;
-                    runtime.setMode(requiredMode);
-                    break;
-                default:
-                    runtime.setMode(Mode.STALL);
-                    break;
-            }
-        }
-    }
-
     private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
         FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
-        SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                FeedRuntimeType.INTAKE, partition);
-        ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
-                .getSubscribableRuntime(subscribableRuntimeId);
-        IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
+        FeedRuntimeId subscribableRuntimeId =
+                new FeedRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+        ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(subscribableRuntimeId);
+        AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
         adapterRuntimeManager.stop();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
@@ -254,11 +122,12 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
             }
 
-            runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
-            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
-                    .getFeedRuntime(connectionId, runtimeId);
+            runtimeId = new FeedRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType, partition,
+                    FeedRuntimeId.DEFAULT_TARGET_ID);
+            CollectionRuntime feedRuntime =
+                    (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
             if (feedRuntime != null) {
-                feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+                feedRuntime.getSourceRuntime().unsubscribe(feedRuntime);
             }
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
@@ -273,26 +142,12 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                 case COMPUTE:
                     // feed could be primary or secondary, doesn't matter
-                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
-                            connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
-                    ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
-                            .getSubscribableRuntime(feedSubscribableRuntimeId);
-                    DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
-                    Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
-
-                    IFrameWriter unsubscribingWriter = null;
-                    for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
-                        IFrameWriter frameWriter = entry.getKey();
-                        FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
-                        if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
-                            unsubscribingWriter = feedFrameWriter;
-                            dWriter.unsubscribeFeed(unsubscribingWriter);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
-                            }
-                            break;
-                        }
-                    }
+                    FeedRuntimeId feedSubscribableRuntimeId = new FeedRuntimeId(connectionId.getFeedId(),
+                            FeedRuntimeType.COMPUTE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+                    ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+                    CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
+                            .getFeedRuntime(connectionId, runtimeId);
+                    feedRuntime.unsubscribe(feedCollectionRuntime);
                     break;
                 default:
                     break;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 6c9ef8d..716468e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -24,18 +24,15 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedManager;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.feed.runtime.SubscribableRuntime;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -80,10 +77,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
      **/
     private int partition;
 
-    private int nPartitions;
-
     /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
+    private FeedManager feedManager;
 
     private FrameTupleAccessor fta;
 
@@ -91,14 +86,19 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
 
     private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
 
-    private FeedRuntimeInputHandler inputSideHandler;
-
     private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
 
     private final FeedMetaOperatorDescriptor opDesc;
 
     private final IRecordDescriptorProvider recordDescProvider;
 
+    private boolean opened;
+
+    /*
+     * In this operator:
+     * writer is the network partitioner
+     * coreOperator is the first operator
+     */
     public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, String operationId,
@@ -108,9 +108,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
         this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
         this.partition = partition;
-        this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
-        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         ctx.setSharedObject(message);
         this.opDesc = feedMetaOperatorDescriptor;
@@ -119,16 +118,12 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
 
     @Override
     public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
+        FeedRuntimeId runtimeId =
+                new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
         try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
+            initializeNewFeedRuntime(runtimeId);
+            opened = true;
             writer.open();
-            coreOperator.open();
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
@@ -136,113 +131,52 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     }
 
     private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
-                recordDesc, feedManager, nPartitions);
-
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
-                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
-        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
-
-        feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
-                recordDesc);
-        feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
+        fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
+        FeedPolicyAccessor fpa = policyEnforcer.getFeedPolicyAccessor();
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        if (fpa.bufferingEnabled()) {
+            writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator, fpa, fta,
+                    feedManager.getFeedMemoryManager());
+        } else {
+            writer = new SyncFeedRuntimeInputHandler(ctx, coreOperator, fta);
+        }
+        feedRuntime = new FeedRuntime(runtimeId);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
-
-        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.inputSideHandler.setCoreOperator(coreOperator);
-
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
-                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
-        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
-        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
-
-        inputSideHandler.reset(nPartitions);
-        feedRuntime.setMode(Mode.PROCESS);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
             FeedUtils.processFeedMessage(buffer, message, fta);
-            inputSideHandler.nextFrame(buffer);
+            writer.nextFrame(buffer);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.WARNING, e.getMessage(), e);
             throw new HyracksDataException(e);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
-        boolean end = inputSideHandler.getMode().equals(Mode.END);
         try {
-            if (!(stalled || end)) {
-                inputSideHandler.nextFrame(null); // signal end of data
-                while (!inputSideHandler.isFinished()) {
-                    synchronized (coreOperator) {
-                        if (inputSideHandler.isFinished()) {
-                            break;
-                        }
-                        coreOperator.wait();
-                    }
-                }
-            } else {
-                inputSideHandler.setFinished(true);
-            }
-            coreOperator.close();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
-            }
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
+            deregister();
         } finally {
-            if (!stalled) {
-                deregister();
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("DEREGISTERING " + this.feedRuntime.getRuntimeId());
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
-                }
-            }
-            inputSideHandler.close();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            if (opened) {
+                writer.close();
             }
         }
     }
 
     private void deregister() {
-        if (feedRuntime != null) {
-            // deregister from subscription manager
-            SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
-            feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
-
-            // deregister from connection manager
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
-        }
+        feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
     }
 
     @Override
     public void flush() throws HyracksDataException {
-        inputSideHandler.flush();
+        writer.flush();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
deleted file mode 100644
index e99ae3a..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.operators;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
-
-    /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
-    /**
-     * A policy enforcer that ensures dyanmic decisions for a feed are taken
-     * in accordance with the associated ingestion policy
-     **/
-    private FeedPolicyEnforcer policyEnforcer;
-
-    /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private FeedRuntime feedRuntime;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents
-     * the flow of data from a feed to a dataset.
-     **/
-    private FeedConnectionId connectionId;
-
-    /**
-     * Denotes the i'th operator instance in a setting where K operator
-     * instances are scheduled to run in parallel
-     **/
-    private int partition;
-
-    /** Total number of partitions available **/
-    private int nPartitions;
-
-    /** Type associated with the core feed operator **/
-    private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
-
-    /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
-
-    private FrameTupleAccessor fta;
-
-    private final IHyracksTaskContext ctx;
-
-    private final String operandId;
-
-    /** The pre-processor associated with this runtime **/
-    private FeedRuntimeInputHandler inputSideHandler;
-
-    public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
-            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
-        this.ctx = ctx;
-        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
-                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.connectionId = feedConnectionId;
-        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                .getApplicationObject()).getFeedManager();
-        this.operandId = operationId;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
-            coreOperator.open();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager, nPartitions);
-
-        setupBasicRuntime(inputSideHandler);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.fta = new FrameTupleAccessor(recordDesc);
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        feedRuntime.setMode(Mode.PROCESS);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
-        }
-    }
-
-    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            inputSideHandler.nextFrame(buffer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            coreOperator.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-            // ignore
-        } finally {
-            if (inputSideHandler != null) {
-                inputSideHandler.close();
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        inputSideHandler.flush();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index 6591795..c7dd3d2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -101,16 +100,6 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
                 nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
                         coreOperator, feedConnectionId, feedPolicyProperties, operandId, this);
                 break;
-            case OTHER:
-                nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
-                        feedConnectionId, feedPolicyProperties, operandId);
-                break;
-            case ETS:
-                nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
-                        recordDescProvider, partition, nPartitions);
-                break;
-            case JOIN:
-                break;
             default:
                 throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index f11b948..b79707b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -25,15 +25,16 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -48,7 +49,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
 
     /** Runtime node pushable corresponding to the core feed operator **/
-    private final AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
 
     /**
      * A policy enforcer that ensures dyanmic decisions for a feed are taken
@@ -74,21 +75,17 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
      **/
     private final int partition;
 
-    private final int nPartitions;
-
     /** Type associated with the core feed operator **/
     private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
 
     /** The (singleton) instance of IFeedManager **/
-    private final IFeedManager feedManager;
+    private final FeedManager feedManager;
 
     private FrameTupleAccessor fta;
 
     private final IHyracksTaskContext ctx;
 
-    private final String operandId;
-
-    private FeedRuntimeInputHandler inputSideHandler;
+    private final String targetId;
 
     private final ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
 
@@ -98,18 +95,17 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, String operationId,
+            Map<String, String> feedPolicyProperties, String targetId,
             FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) throws HyracksDataException {
         this.ctx = ctx;
-        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+        this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
         this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
         this.partition = partition;
-        this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
-        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
-        this.operandId = operationId;
+        this.targetId = targetId;
         ctx.setSharedObject(message);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
@@ -117,15 +113,10 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     @Override
     public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
         try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
-            coreOperator.open();
+            initializeNewFeedRuntime(runtimeId);
+            insertOperator.open();
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failed to open feed store operator", e);
             throw new HyracksDataException(e);
@@ -133,39 +124,29 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     }
 
     private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Runtime not found for  " + runtimeId + " connection id " + connectionId);
-        }
-        this.fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
-                recordDesc, feedManager, nPartitions);
-        if (coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
-            AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
+        fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
+        insertOperator.setOutputFrameWriter(0, writer, recordDesc);
+        if (insertOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
+            AsterixLSMInsertDeleteOperatorNodePushable indexOp =
+                    (AsterixLSMInsertDeleteOperatorNodePushable) insertOperator;
             if (!indexOp.isPrimary()) {
-                inputSideHandler.setBufferingEnabled(false);
+                writer = insertOperator;
+                setupBasicRuntime(writer);
+                return;
             }
         }
-        setupBasicRuntime(inputSideHandler);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.fta = new FrameTupleAccessor(recordDesc);
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        this.inputSideHandler.reset(nPartitions);
-        this.inputSideHandler.setCoreOperator(coreOperator);
-        feedRuntime.setMode(Mode.PROCESS);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning(
-                    "Retreived state from the zombie instance from previous execution for " + runtimeType + " node.");
+        if (policyEnforcer.getFeedPolicyAccessor().bufferingEnabled()) {
+            writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator,
+                    policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFeedMemoryManager());
+        } else {
+            writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
         }
+        setupBasicRuntime(writer);
     }
 
-    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+    private void setupBasicRuntime(IFrameWriter frameWriter) throws Exception {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
+        feedRuntime = new FeedRuntime(runtimeId);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
     }
 
@@ -173,7 +154,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
             FeedUtils.processFeedMessage(buffer, message, fta);
-            inputSideHandler.nextFrame(buffer);
+            writer.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
@@ -182,68 +163,25 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     @Override
     public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
-        }
-        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
         try {
-            if (!stalled) {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
-                            + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
-                }
-                inputSideHandler.nextFrame(null); // signal end of data
-                while (!inputSideHandler.isFinished()) {
-                    synchronized (coreOperator) {
-                        if (inputSideHandler.isFinished()) {
-                            break;
-                        }
-                        coreOperator.wait();
-                    }
-                }
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("ABOUT TO CLOSE OPERATOR  " + coreOperator);
-                }
-            }
-            coreOperator.close();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+            writer.close();
         } finally {
-            if (!stalled) {
-                deregister();
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("DEREGISTERING " + this.feedRuntime.getRuntimeId());
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
-                }
-            }
-            inputSideHandler.close();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
-            }
+            deregister();
         }
     }
 
     private void deregister() {
-        if (feedRuntime != null) {
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
-        }
+        feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
     }
 
     @Override
     public void flush() throws HyracksDataException {
-        inputSideHandler.flush();
+        writer.flush();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
index f16e24b..d986648 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
@@ -43,7 +43,7 @@ public class ExternalDataExceptionUtils {
         return true;
     }
 
-    public static HyracksDataException suppress(HyracksDataException hde, Throwable th) {
+    public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
         if (hde == null) {
             return new HyracksDataException(th);
         } else {
@@ -51,4 +51,20 @@ public class ExternalDataExceptionUtils {
             return hde;
         }
     }
+
+    public static Throwable suppress(Throwable suppressor, Throwable suppressed) {
+        if (suppressor == null) {
+            return suppressed;
+        } else if (suppressed != null) {
+            suppressor.addSuppressed(suppressed);
+        }
+        return suppressor;
+    }
+
+    public static HyracksDataException convertToHyracksDataException(Throwable throwable) {
+        if (throwable == null || throwable instanceof HyracksDataException) {
+            return (HyracksDataException) throwable;
+        }
+        return new HyracksDataException(throwable);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
new file mode 100644
index 0000000..8a6d1b6
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class FeedMemoryManagerUnitTest extends TestCase {
+
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int NUM_FRAMES = 2048;
+    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+    private static final int NUM_THREADS = 8;
+    private static final int MAX_SIZE = 52;
+    private static final double RELEASE_PROBABILITY = 0.20;
+
+    public FeedMemoryManagerUnitTest(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(FeedMemoryManagerUnitTest.class);
+    }
+
+    @org.junit.Test
+    public void testMemoryManager() {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        int i = 0;
+        while (fmm.get() != null) {
+            i++;
+        }
+        Assert.assertEquals(i, NUM_FRAMES);
+    }
+
+    @org.junit.Test
+    public void testConcurrentMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    @org.junit.Test
+    public void testVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            int i = 0;
+            int req;
+            while (true) {
+                req = random.nextInt(MAX_SIZE) + 1;
+                if (req == 1) {
+                    if (fmm.get() != null) {
+                        i += 1;
+                    } else {
+                        break;
+                    }
+                } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                    i += req;
+                } else {
+                    break;
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    @org.junit.Test
+    public void testConcurrentVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+
+            VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int allocated = 0;
+            for (int i = 0; i < threads.length; i++) {
+                if (runners[i].cause() != null) {
+                    runners[i].cause().printStackTrace();
+                    Assert.fail(runners[i].cause().getMessage());
+                }
+                allocated += runners[i].getAllocated();
+            }
+            Assert.assertEquals(allocated <= NUM_FRAMES, true);
+            for (int i = 0; i < threads.length; i++) {
+                Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true);
+            }
+            Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseMemoryManager() throws HyracksDataException {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        Random random = new Random();
+        ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        while (true) {
+            if (random.nextDouble() < RELEASE_PROBABILITY) {
+                if (!stack.isEmpty()) {
+                    fmm.release(stack.pop());
+                }
+            } else {
+                ByteBuffer buffer = fmm.get();
+                if (buffer == null) {
+                    break;
+                } else {
+                    stack.push(buffer);
+                }
+            }
+        }
+        Assert.assertEquals(stack.size(), NUM_FRAMES);
+        Assert.assertEquals(fmm.remaining(), 0);
+        for (ByteBuffer buffer : stack) {
+            fmm.release(buffer);
+        }
+        stack.clear();
+        Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeGoodAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+            int i = 0;
+            int req;
+            while (true) {
+                // release
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        ByteBuffer buffer = stack.pop();
+                        i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+                        fmm.release(buffer);
+                    }
+                } else {
+                    // acquire
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        ByteBuffer buffer = fmm.get();
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += 1;
+                        } else {
+                            break;
+                        }
+                    } else {
+                        ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += req;
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (VarSizeGoodAllocator allocator : runners) {
+                if (allocator.cause() != null) {
+                    allocator.cause().printStackTrace();
+                    Assert.fail(allocator.cause().getMessage());
+                }
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+    }
+
+    /*
+     * Runnables used for unit tests
+     */
+    private class FixedSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+
+        public FixedSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        @Override
+        public void run() {
+            while (fmm.get() != null) {
+                allocated++;
+            }
+        }
+    }
+
+    private class FixedSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        private final Random random = new Random();
+
+        public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return stack.size();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        try {
+                            fmm.release(stack.pop());
+                        } catch (HyracksDataException e) {
+                            Assert.fail();
+                        }
+                    }
+                } else {
+                    ByteBuffer buffer = fmm.get();
+                    if (buffer == null) {
+                        break;
+                    } else {
+                        stack.push(buffer);
+                    }
+                }
+            }
+        }
+    }
+
+    private class VarSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+
+        public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    if (random.nextDouble() < RELEASE_PROBABILITY) {
+                        if (!stack.isEmpty()) {
+                            ByteBuffer buffer = stack.pop();
+                            allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+                            fmm.release(buffer);
+                        }
+                    } else {
+                        req = random.nextInt(MAX_SIZE) + 1;
+                        if (req == 1) {
+                            ByteBuffer buffer = fmm.get();
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += 1;
+                            } else {
+                                break;
+                            }
+                        } else {
+                            ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += req;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+
+    private class VarSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+
+        public VarSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public int getLastReq() {
+            return req;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        if (fmm.get() != null) {
+                            allocated += 1;
+                        } else {
+                            break;
+                        }
+                    } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                        allocated += req;
+                    } else {
+                        break;
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+}


Mime
View raw message