asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [4/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:50 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index fa48b58..2a87cab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -19,635 +19,143 @@
 package org.apache.asterix.external.feed.management;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveJob;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.api.FeedOperationCounter;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedJoint.State;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
-import org.apache.asterix.external.util.FeedUtils.JobType;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.external.feed.watch.FeedEventSubscriber;
+import org.apache.asterix.external.feed.watch.NoOpSubscriber;
 import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
-import org.apache.log4j.Logger;
 
 public class FeedEventsListener implements IActiveEntityEventsListener {
-    private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
-    private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
-    private final List<IActiveLifecycleEventSubscriber> subscribers;
-    private final Map<Long, ActiveJob> jobs;
-    private final Map<Long, ActiveJob> intakeJobs;
-    private final Map<EntityId, FeedIntakeInfo> entity2Intake;
-    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
-    private EntityId entityId;
-    private IFeedJoint sourceFeedJoint;
-
-    public FeedEventsListener(EntityId entityId) {
+    // constants
+    private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName());
+    // members
+    private final EntityId entityId;
+    private final List<IDataset> datasets;
+    private final String[] sources;
+    private final List<IActiveEventSubscriber> subscribers;
+    private volatile ActivityState state;
+    private int numRegistered;
+    private JobId jobId;
+
+    public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) {
         this.entityId = entityId;
+        this.datasets = datasets;
+        this.sources = sources;
         subscribers = new ArrayList<>();
-        jobs = new HashMap<>();
-        feedPipeline = new HashMap<>();
-        entity2Intake = new HashMap<>();
-        connectJobInfos = new HashMap<>();
-        intakeJobs = new HashMap<>();
+        state = ActivityState.STOPPED;
     }
 
     @Override
-    public void notify(ActiveEvent event) {
+    public synchronized void notify(ActiveEvent event) {
         try {
             switch (event.getEventKind()) {
-                case JOB_START:
-                    handleJobStartEvent(event);
+                case JOB_STARTED:
+                    start(event);
                     break;
-                case JOB_FINISH:
-                    handleJobFinishEvent(event);
+                case JOB_FINISHED:
+                    finish();
                     break;
                 case PARTITION_EVENT:
-                    handlePartitionStart(event);
+                    partition((ActivePartitionMessage) event.getEventObject());
                     break;
                 default:
-                    LOGGER.warn("Unknown Feed Event" + event);
+                    LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event);
                     break;
             }
+            notifySubscribers(event);
         } catch (Exception e) {
-            LOGGER.error("Unhandled Exception", e);
-        }
-    }
-
-    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
-        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
-        JobType jobType = (JobType) jobInfo.getJobObject();
-        switch (jobType) {
-            case INTAKE:
-                handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
-                break;
-            case FEED_CONNECT:
-                handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
-                break;
-            default:
-        }
-    }
-
-    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
-        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
-        JobType jobType = (JobType) jobInfo.getJobObject();
-        switch (jobType) {
-            case FEED_CONNECT:
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("Collect Job finished for  " + jobInfo);
-                }
-                handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
-                break;
-            case INTAKE:
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
-                }
-                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
-                break;
-            default:
-                break;
-        }
-    }
-
-    private synchronized void handlePartitionStart(ActiveEvent message) {
-        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
-        JobType jobType = (JobType) jobInfo.getJobObject();
-        switch (jobType) {
-            case FEED_CONNECT:
-                ((FeedConnectJobInfo) jobInfo).partitionStart();
-                if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
-                    notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
-                }
-                break;
-            case INTAKE:
-                handleIntakePartitionStarts(message, jobInfo);
-                break;
-            default:
-                break;
-
-        }
-    }
-
-    private void handleIntakePartitionStarts(ActiveEvent message, ActiveJob jobInfo) {
-        if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) {
-            ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
-            jobInfo.setState(ActivityState.ACTIVE);
-            notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
+            LOGGER.log(Level.SEVERE, "Unhandled Exception", e);
         }
     }
 
-    public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) throws HyracksDataException {
-        Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline = feedPipeline
-                .get(feedJoint.getOwnerFeedId());
-        if (feedJointsOnPipeline == null) {
-            feedJointsOnPipeline = new Pair<>(new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>());
-            feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
-            feedJointsOnPipeline.second.add(feedJoint);
-        } else {
-            if (!feedJointsOnPipeline.second.contains(feedJoint)) {
-                feedJointsOnPipeline.second.add(feedJoint);
+    private synchronized void notifySubscribers(ActiveEvent event) {
+        notifyAll();
+        Iterator<IActiveEventSubscriber> it = subscribers.iterator();
+        while (it.hasNext()) {
+            IActiveEventSubscriber subscriber = it.next();
+            if (subscriber.done()) {
+                it.remove();
             } else {
-                throw new RuntimeDataException(
-                        ErrorCode.FEED_MANAGEMENT_FEED_EVENT_LISTENER_FEED_JOINT_REGISTERED, feedJoint);
-            }
-        }
-    }
-
-    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
-        FeedIntakeInfo info = (FeedIntakeInfo) intakeJobs.remove(jobId.getId());
-        jobs.remove(jobId.getId());
-        entity2Intake.remove(info.getFeedId());
-        List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second;
-        joints.remove(info.getIntakeFeedJoint());
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Deregistered feed intake job [" + jobId + "]");
-        }
-    }
-
-    private static synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
-        List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<>();
-        Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-            IOperatorDescriptor opDesc = entry.getValue();
-            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
-                intakeOperatorIds.add(opDesc.getOperatorId());
-            }
-        }
-
-        IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-        JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
-        List<String> intakeLocations = new ArrayList<>();
-        for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
-            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
-            int nOperatorInstances = operatorLocations.size();
-            for (int i = 0; i < nOperatorInstances; i++) {
-                intakeLocations.add(operatorLocations.get(i));
-            }
-        }
-        // intakeLocations is an ordered list; 
-        // element at position i corresponds to location of i'th instance of operator
-        intakeJobInfo.setIntakeLocation(intakeLocations);
-    }
-
-    public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        if (cInfo != null) {
-            return cInfo.getSourceFeedJoint();
-        }
-        return null;
-    }
-
-    public synchronized void registerFeedIntakeJob(EntityId feedId, JobId jobId, JobSpecification jobSpec)
-            throws HyracksDataException {
-        if (entity2Intake.get(feedId) != null) {
-            throw new RuntimeDataException(
-                    ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_ALREADY_HAVE_INTAKE_JOB);
-        }
-        if (intakeJobs.get(jobId.getId()) != null) {
-            throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_INTAKE_JOB_REGISTERED);
-        }
-        if (jobs.get(jobId.getId()) != null) {
-            throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED);
-        }
-
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
-        sourceFeedJoint = null;
-        for (IFeedJoint joint : pair.second) {
-            if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
-                sourceFeedJoint = joint;
-                break;
-            }
-        }
-
-        if (sourceFeedJoint != null) {
-            FeedIntakeInfo intakeJobInfo =
-                    new FeedIntakeInfo(jobId, ActivityState.CREATED, feedId, sourceFeedJoint, jobSpec);
-            pair.first.setFeedJobInfo(intakeJobInfo);
-            entity2Intake.put(feedId, intakeJobInfo);
-            jobs.put(jobId.getId(), intakeJobInfo);
-            intakeJobs.put(jobId.getId(), intakeJobInfo);
-
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
-            }
-        } else {
-            throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL, jobId,
-                    feedId);
-        }
-    }
-
-    public synchronized void registerFeedCollectionJob(EntityId sourceFeedId, FeedConnectionId connectionId,
-            JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) throws HyracksDataException {
-        if (jobs.get(jobId.getId()) != null) {
-            throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED);
-        }
-        if (connectJobInfos.containsKey(jobId.getId())) {
-            throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED);
-        }
-
-        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second;
-        FeedConnectionId cid = null;
-        IFeedJoint collectionSourceFeedJoint = null;
-        for (IFeedJoint joint : feedJoints) {
-            cid = joint.getReceiver(connectionId);
-            if (cid != null) {
-                collectionSourceFeedJoint = joint;
-                break;
-            }
-        }
-
-        if (cid != null) {
-            FeedConnectJobInfo cInfo = new FeedConnectJobInfo(sourceFeedId, jobId, ActivityState.CREATED, connectionId,
-                    collectionSourceFeedJoint, null, jobSpec, feedPolicy);
-            jobs.put(jobId.getId(), cInfo);
-            connectJobInfos.put(connectionId, cInfo);
-
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
-            }
-        } else {
-            LOGGER.warn(
-                    "Could not register feed collection job [" + jobId + "]" + " for feed connection " + connectionId);
-        }
-    }
-
-    @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) {
-        FeedConnectionId feedConnectionId = null;
-        Map<String, String> feedPolicy = null;
-        try {
-            for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
-                if (opDesc instanceof FeedCollectOperatorDescriptor) {
-                    feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
-                    feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
-                    registerFeedCollectionJob(((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(),
-                            feedConnectionId, jobId, spec, feedPolicy);
-                    return;
-                } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
-                    registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(), jobId, spec);
-                    return;
+                subscriber.notify(event);
+                if (subscriber.done()) {
+                    it.remove();
                 }
             }
-        } catch (Exception e) {
-            LOGGER.error(e);
-        }
-    }
-
-    public synchronized List<String> getConnectionLocations(IFeedJoint feedJoint, final FeedConnectionRequest request)
-            throws Exception {
-        List<String> locations = null;
-        switch (feedJoint.getType()) {
-            case COMPUTE:
-                FeedConnectionId connectionId = feedJoint.getProvider();
-                FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-                locations = cInfo.getComputeLocations();
-                break;
-            case INTAKE:
-                FeedIntakeInfo intakeInfo = entity2Intake.get(feedJoint.getOwnerFeedId());
-                locations = intakeInfo.getIntakeLocation();
-                break;
-            default:
-                break;
         }
-        return locations;
     }
 
-    private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) {
-        if (subscribers != null && !subscribers.isEmpty()) {
-            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
-                subscriber.handleEvent(event);
+    private void partition(ActivePartitionMessage message) {
+        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+            numRegistered++;
+            if (numRegistered == getSources().length) {
+                state = ActivityState.STARTED;
             }
         }
     }
 
-    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message)
-            throws Exception {
+    private void finish() throws Exception {
         IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-        JobInfo info = hcc.getJobInfo(message.getJobId());
-        JobStatus status = info.getStatus();
-        EntityId feedId = intakeInfo.getFeedId();
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
-        if (status.equals(JobStatus.FAILURE)) {
-            pair.first.setFailedIngestion(true);
-        }
-        // remove feed joints
-        deregisterFeedIntakeJob(message.getJobId());
-        // notify event listeners
-        feedPipeline.remove(feedId);
-        entity2Intake.remove(feedId);
-        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE
-                : ActiveLifecycleEvent.FEED_INTAKE_ENDED);
-    }
-
-    private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
-        FeedConnectionId connectionId = cInfo.getConnectionId();
-
-        IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-        JobStatus status = info.getStatus();
-        boolean failure = status != null && status.equals(JobStatus.FAILURE);
-        FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-        boolean retainSubsription =
-                cInfo.getState().equals(ActivityState.UNDER_RECOVERY) || (failure && fpa.continueOnHardwareFailure());
-
-        if (!retainSubsription) {
-            IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
-            feedJoint.removeReceiver(connectionId);
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info(
-                        "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
-            }
-        }
-
-        connectJobInfos.remove(connectionId);
-        jobs.remove(cInfo.getJobId().getId());
-        // notify event listeners
-        ActiveLifecycleEvent event =
-                failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED;
-        notifyFeedEventSubscribers(event);
-    }
-
-    public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getStorageLocations();
-    }
-
-    public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getCollectLocations();
+        JobStatus status = hcc.getJobStatus(jobId);
+        state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
     }
 
-    public List<String> getFeedIntakeLocations(EntityId feedId) {
-        return entity2Intake.get(feedId).getIntakeLocation();
+    private void start(ActiveEvent event) {
+        this.jobId = event.getJobId();
+        state = ActivityState.STARTING;
     }
 
-    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getJobId();
-    }
-
-    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
-        List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
-                ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
-        if (joints != null && !joints.isEmpty()) {
-            for (IFeedJoint joint : joints) {
-                if (joint.getFeedJointKey().equals(feedJointKey)) {
-                    return true;
-                }
-            }
-        }
-        return false;
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
     }
 
-    public Collection<IFeedJoint> getFeedIntakeJoints() {
-        List<IFeedJoint> intakeFeedPoints = new ArrayList<>();
-        for (FeedIntakeInfo info : entity2Intake.values()) {
-            intakeFeedPoints.add(info.getIntakeFeedJoint());
-        }
-        return intakeFeedPoints;
+    @Override
+    public ActivityState getState() {
+        return state;
     }
 
-    public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
-        List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId())
-                ? feedPipeline.get(feedPointKey.getFeedId()).second : null;
-        if (joints != null && !joints.isEmpty()) {
-            for (IFeedJoint joint : joints) {
-                if (joint.getFeedJointKey().equals(feedPointKey)) {
-                    return joint;
-                }
-            }
+    @Override
+    public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
+        if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
+            throw new HyracksDataException("Can only wait for STARTED or STOPPED state");
         }
-        return null;
-    }
-
-    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
-        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
-        if (feedJoint != null) {
-            return feedJoint;
-        } else {
-            String jointKeyString = feedJointKey.getStringRep();
-            List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId())
-                    ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
-            IFeedJoint candidateJoint = null;
-            if (jointsOnPipeline != null) {
-                for (IFeedJoint joint : jointsOnPipeline) {
-                    if (jointKeyString.contains(joint.getFeedJointKey().getStringRep()) && (candidateJoint == null
-                            || /*found feed point is a super set of the earlier find*/joint.getFeedJointKey()
-                                    .getStringRep().contains(candidateJoint.getFeedJointKey().getStringRep()))) {
-                        candidateJoint = joint;
-                    }
-                }
+        synchronized (this) {
+            if (this.state == ActivityState.FAILED) {
+                throw new HyracksDataException("Feed has failed");
+            } else if (this.state == state) {
+                return NoOpSubscriber.INSTANCE;
             }
-            return candidateJoint;
+            return doSubscribe(state);
         }
     }
 
-    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getSpec();
-    }
-
-    public IFeedJoint getFeedPoint(EntityId sourceFeedId, IFeedJoint.FeedJointType type) {
-        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second;
-        for (IFeedJoint joint : joints) {
-            if (joint.getType().equals(type)) {
-                return joint;
-            }
-        }
-        return null;
-    }
-
-    private void setLocations(FeedConnectJobInfo cInfo) {
-        JobSpecification jobSpec = cInfo.getSpec();
-
-        List<OperatorDescriptorId> collectOperatorIds = new ArrayList<>();
-        List<OperatorDescriptorId> computeOperatorIds = new ArrayList<>();
-        List<OperatorDescriptorId> storageOperatorIds = new ArrayList<>();
-
-        Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-            IOperatorDescriptor opDesc = entry.getValue();
-            IOperatorDescriptor actualOp;
-            if (opDesc instanceof FeedMetaOperatorDescriptor) {
-                actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
-            } else {
-                actualOp = opDesc;
-            }
-
-            if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
-                AlgebricksMetaOperatorDescriptor op = (AlgebricksMetaOperatorDescriptor) actualOp;
-                IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
-                boolean computeOp = false;
-                for (IPushRuntimeFactory rf : runtimeFactories) {
-                    if (rf instanceof AssignRuntimeFactory) {
-                        IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
-                        IOperatorDescriptor sourceOp =
-                                jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId()).getLeft().getLeft();
-                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
-                            computeOp = true;
-                            break;
-                        }
-                    }
-                }
-                if (computeOp) {
-                    computeOperatorIds.add(entry.getKey());
-                }
-            } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
-                storageOperatorIds.add(entry.getKey());
-            } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
-                collectOperatorIds.add(entry.getKey());
-            }
-        }
-
-        try {
-            IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-            JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-            List<String> collectLocations = new ArrayList<>();
-            for (OperatorDescriptorId collectOpId : collectOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
-                int nOperatorInstances = operatorLocations.size();
-                for (int i = 0; i < nOperatorInstances; i++) {
-                    collectLocations.add(operatorLocations.get(i));
-                }
-            }
-
-            List<String> computeLocations = new ArrayList<>();
-            for (OperatorDescriptorId computeOpId : computeOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
-                if (operatorLocations != null) {
-                    int nOperatorInstances = operatorLocations.size();
-                    for (int i = 0; i < nOperatorInstances; i++) {
-                        computeLocations.add(operatorLocations.get(i));
-                    }
-                } else {
-                    computeLocations.clear();
-                    computeLocations.addAll(collectLocations);
-                }
-            }
-
-            List<String> storageLocations = new ArrayList<>();
-            for (OperatorDescriptorId storageOpId : storageOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
-                if (operatorLocations == null) {
-                    continue;
-                }
-                int nOperatorInstances = operatorLocations.size();
-                for (int i = 0; i < nOperatorInstances; i++) {
-                    storageLocations.add(operatorLocations.get(i));
-                }
-            }
-            cInfo.setCollectLocations(collectLocations);
-            cInfo.setComputeLocations(computeLocations);
-            cInfo.setStorageLocations(storageLocations);
-
-        } catch (Exception e) {
-            LOGGER.error("Error while setting feed active locations", e);
-        }
-
-    }
-
-    public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+    // Called within synchronized block
+    private FeedEventSubscriber doSubscribe(ActivityState state) {
+        FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state);
         subscribers.add(subscriber);
-    }
-
-    public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
-        subscribers.remove(subscriber);
-    }
-
-    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
-            IActiveLifecycleEventSubscriber eventSubscriber) {
-        boolean active = false;
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        if (cInfo != null) {
-            active = cInfo.getState().equals(ActivityState.ACTIVE);
-        }
-        if (active) {
-            registerFeedEventSubscriber(eventSubscriber);
-        }
-        return active;
-    }
-
-    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId);
-    }
-
-    private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws ACIDException {
-        // set locations of feed sub-operations (intake, compute, store)
-        setLocations(cInfo);
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
-        // activate joints
-        List<IFeedJoint> joints = pair.second;
-        for (IFeedJoint joint : joints) {
-            if (joint.getProvider().equals(cInfo.getConnectionId())) {
-                joint.setState(State.ACTIVE);
-                if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
-                    cInfo.setComputeFeedJoint(joint);
-                }
-            }
-        }
-        cInfo.setState(ActivityState.ACTIVE);
-    }
-
-    private synchronized boolean isConnectedToDataset(String datasetName) {
-        for (FeedConnectionId connection : connectJobInfos.keySet()) {
-            if (connection.getDatasetName().equals(datasetName)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public FeedConnectionId[] getConnections() {
-        return connectJobInfos.keySet().toArray(new FeedConnectionId[connectJobInfos.size()]);
-    }
-
-    public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
-        return isFeedPointAvailable(feedJointKey);
+        return subscriber;
     }
 
     @Override
-    public boolean isEntityActive() {
-        return !jobs.isEmpty();
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
     }
 
-    @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    public IFeedJoint getSourceFeedJoint() {
-        return sourceFeedJoint;
-    }
-
-    @Override
-    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
-        return isConnectedToDataset(datasetName);
+    public String[] getSources() {
+        return sources;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java
deleted file mode 100644
index 93f81d9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedInfo {
-    public JobSpecification jobSpec;
-    public JobInfo jobInfo;
-    public JobId jobId;
-    public FeedInfoType infoType;
-    public State state;
-
-    public enum State {
-        ACTIVE,
-        INACTIVE
-    }
-
-    public enum FeedInfoType {
-        INTAKE,
-        COLLECT
-    }
-
-    public FeedInfo(JobSpecification jobSpec, JobId jobId, FeedInfoType infoType) {
-        this.jobSpec = jobSpec;
-        this.jobId = jobId;
-        this.infoType = infoType;
-        this.state = State.INACTIVE;
-    }
-
-    @Override
-    public String toString() {
-        return " job id " + jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
deleted file mode 100644
index 2905bb2..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located
- * along a feed ingestion pipeline at a point where the tuples moving as part of data flow
- * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
- * routed to multiple paths.
- */
-public class FeedJointKey implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private final EntityId primaryFeedId;
-    private final List<String> appliedFunctions;
-    private final String stringRep;
-
-    public FeedJointKey(EntityId feedId, List<String> appliedFunctions) {
-        this.primaryFeedId = feedId;
-        this.appliedFunctions = appliedFunctions;
-        StringBuilder builder = new StringBuilder();
-        builder.append(feedId);
-        builder.append(":");
-        builder.append(StringUtils.join(appliedFunctions, ':'));
-        stringRep = builder.toString();
-    }
-
-    public EntityId getFeedId() {
-        return primaryFeedId;
-    }
-
-    public List<String> getAppliedFunctions() {
-        return appliedFunctions;
-    }
-
-    public String getStringRep() {
-        return stringRep;
-    }
-
-    @Override
-    public final String toString() {
-        return stringRep;
-    }
-
-    @Override
-    public int hashCode() {
-        return stringRep.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || !(o instanceof FeedJointKey)) {
-            return false;
-        }
-        return stringRep.equals(((FeedJointKey) o).stringRep);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java
deleted file mode 100644
index bab4376..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.asterix.external.feed.api.IFeedWork;
-import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
-import org.apache.asterix.external.feed.api.IFeedWorkManager;
-
-/**
- * Handles asynchronous execution of feed management related tasks.
- */
-public class FeedWorkManager implements IFeedWorkManager {
-
-    public static final FeedWorkManager INSTANCE = new FeedWorkManager();
-
-    private final ExecutorService executorService = Executors.newCachedThreadPool();
-
-    private FeedWorkManager() {
-    }
-
-    public void submitWork(IFeedWork work, IFeedWorkEventListener listener) {
-        Runnable runnable = work.getRunnable();
-        try {
-            executorService.execute(runnable);
-            listener.workCompleted(work);
-        } catch (Exception e) {
-            listener.workFailed(work, e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
deleted file mode 100644
index 7a3a376..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * @deprecated A feed control message indicating the need to end the feed. This message is dispatched
- *             to all locations that host an operator involved in the feed pipeline.
- *             Instead, use IMessageBroker messages
- */
-@Deprecated
-public class EndFeedMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final EntityId sourceFeedId;
-
-    private final FeedConnectionId connectionId;
-
-    private final FeedRuntimeType sourceRuntimeType;
-
-    private final boolean completeDisconnection;
-
-    private final EndMessageType endMessageType;
-
-    public enum EndMessageType {
-        DISCONNECT_FEED,
-        DISCONTINUE_SOURCE
-    }
-
-    public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, EntityId sourceFeedId,
-            boolean completeDisconnection, EndMessageType endMessageType) {
-        super(MessageType.END);
-        this.connectionId = connectionId;
-        this.sourceRuntimeType = sourceRuntimeType;
-        this.sourceFeedId = sourceFeedId;
-        this.completeDisconnection = completeDisconnection;
-        this.endMessageType = endMessageType;
-    }
-
-    @Override
-    public String toString() {
-        return MessageType.END.name() + "  " + connectionId + " [" + sourceRuntimeType + "] ";
-    }
-
-    public FeedRuntimeType getSourceRuntimeType() {
-        return sourceRuntimeType;
-    }
-
-    public EntityId getSourceFeedId() {
-        return sourceFeedId;
-    }
-
-    public boolean isCompleteDisconnection() {
-        return completeDisconnection;
-    }
-
-    public EndMessageType getEndMessageType() {
-        return endMessageType;
-    }
-
-    @Override
-    public ObjectNode toJSON()  {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode obj = om.createObjectNode();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getEntityName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        return obj;
-    }
-
-    public FeedConnectionId getFeedConnectionId() {
-        return connectionId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
deleted file mode 100644
index 4f57fb5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.hyracks.api.dataflow.value.JSONSerializable;
-
-/**
- * A control message that can be sent to the runtime instance of a
- * feed's adapter.
- */
-public abstract class FeedMessage implements IActiveMessage, JSONSerializable {
-
-    private static final long serialVersionUID = 1L;
-
-    protected final MessageType messageType;
-
-    public FeedMessage(MessageType messageType) {
-        this.messageType = messageType;
-    }
-
-    public MessageType getMessageType() {
-        return messageType;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
deleted file mode 100644
index 821a0b1..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.Map;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.IActiveRuntime;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Represents the feed runtime that collects feed tuples from another feed.
- * In case of a primary feed, the CollectionRuntime collects tuples from the feed
- * intake job. For a secondary feed, tuples are collected from the intake/compute
- * runtime associated with the source feed.
- */
-public class CollectionRuntime implements IActiveRuntime {
-
-    private final FeedConnectionId connectionId;        // [Dataverse - Feed - Dataset]
-    private final ISubscribableRuntime sourceRuntime;   // Runtime that provides the data
-    private final Map<String, String> feedPolicy;       // Policy associated with the feed
-    private final FeedFrameCollector frameCollector;    // Collector that can be plugged into a frame distributor
-    private final IHyracksTaskContext ctx;
-    private final ActiveRuntimeId runtimeId;
-
-    public CollectionRuntime(FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
-            ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy, IHyracksTaskContext ctx,
-            FeedFrameCollector frameCollector) {
-        this.runtimeId = runtimeId;
-        this.connectionId = connectionId;
-        this.sourceRuntime = sourceRuntime;
-        this.feedPolicy = feedPolicy;
-        this.ctx = ctx;
-        this.frameCollector = frameCollector;
-    }
-
-    public void waitTillCollectionOver() throws InterruptedException {
-        if (!(isCollectionOver())) {
-            synchronized (frameCollector) {
-                while (!isCollectionOver()) {
-                    frameCollector.wait();
-                }
-            }
-        }
-    }
-
-    private boolean isCollectionOver() {
-        return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
-                || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
-    }
-
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public ISubscribableRuntime getSourceRuntime() {
-        return sourceRuntime;
-    }
-
-    public FeedFrameCollector getFrameCollector() {
-        return frameCollector;
-    }
-
-    public IHyracksTaskContext getCtx() {
-        return ctx;
-    }
-
-    @Override
-    public ActiveRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    @Override
-    public void stop() throws HyracksDataException, InterruptedException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index d32a604..3100704 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -19,79 +19,43 @@
 package org.apache.asterix.external.feed.runtime;
 
 import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
-public class IngestionRuntime extends SubscribableRuntime {
+public class IngestionRuntime implements IActiveRuntime {
+
+    private static final Logger LOGGER = Logger.getLogger(IngestionRuntime.class.getName());
 
     private final AdapterRuntimeManager adapterRuntimeManager;
-    private final IHyracksTaskContext ctx;
-    private int numSubscribers = 0;
+    private final ActiveRuntimeId runtimeId;
+    private final EntityId feedId;
 
-    public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
-            AdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
-        super(entityId, runtimeId, feedWriter);
+    public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, AdapterRuntimeManager adaptorRuntimeManager) {
+        this.feedId = entityId;
+        this.runtimeId = runtimeId;
         this.adapterRuntimeManager = adaptorRuntimeManager;
-        this.ctx = ctx;
     }
 
     @Override
-    public synchronized void subscribe(CollectionRuntime collectionRuntime) throws HyracksDataException {
-        FeedFrameCollector collector = collectionRuntime.getFrameCollector();
-        dWriter.subscribe(collector);
-        subscribers.add(collectionRuntime);
-        if (numSubscribers == 0) {
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE,
-                    TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
-            start();
-        }
-        numSubscribers++;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
-        }
-    }
-
-    @Override
-    public synchronized void unsubscribe(CollectionRuntime collectionRuntime) throws InterruptedException {
-        numSubscribers--;
-        if (numSubscribers == 0) {
-            stop();
-        }
-        subscribers.remove(collectionRuntime);
-    }
-
-    public AdapterRuntimeManager getAdapterRuntimeManager() {
-        return adapterRuntimeManager;
-    }
-
-    public void terminate() {
-        for (IActiveRuntime subscriber : subscribers) {
-            try {
-                unsubscribe((CollectionRuntime) subscriber);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Excpetion in unsubscribing " + subscriber + " message " + e.getMessage());
-                }
-            }
-        }
+    public ActiveRuntimeId getRuntimeId() {
+        return this.runtimeId;
     }
 
     public void start() {
         adapterRuntimeManager.start();
+        LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " running on partition " + runtimeId);
     }
 
     @Override
     public void stop() throws InterruptedException {
         adapterRuntimeManager.stop();
+        LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId);
+    }
+
+    public EntityId getFeedId() {
+        return feedId;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
deleted file mode 100644
index fb70fdb..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveRuntime;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-
-public abstract class SubscribableRuntime implements ISubscribableRuntime {
-
-    protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
-    protected final EntityId feedId;
-    protected final List<IActiveRuntime> subscribers;
-    protected final DistributeFeedFrameWriter dWriter;
-    protected final ActiveRuntimeId runtimeId;
-
-    public SubscribableRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
-        this.runtimeId = runtimeId;
-        this.feedId = feedId;
-        this.dWriter = dWriter;
-        this.subscribers = new ArrayList<>();
-    }
-
-    @Override
-    public ActiveRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public String toString() {
-        return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
deleted file mode 100644
index 82cdddf..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.active.ActiveJob;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedUtils.JobType;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedConnectJobInfo extends ActiveJob {
-
-    private static final long serialVersionUID = 1L;
-    private final FeedConnectionId connectionId;
-    private final Map<String, String> feedPolicy;
-    private final IFeedJoint sourceFeedJoint;
-    private IFeedJoint computeFeedJoint;
-
-    private List<String> collectLocations;
-    private List<String> computeLocations;
-    private List<String> storageLocations;
-    private int partitionStarts = 0;
-
-    public FeedConnectJobInfo(EntityId entityId, JobId jobId, ActivityState state, FeedConnectionId connectionId,
-            IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
-            Map<String, String> feedPolicy) {
-        super(entityId, jobId, state, JobType.FEED_CONNECT, spec);
-        this.connectionId = connectionId;
-        this.sourceFeedJoint = sourceFeedJoint;
-        this.computeFeedJoint = computeFeedJoint;
-        this.feedPolicy = feedPolicy;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public List<String> getCollectLocations() {
-        return collectLocations;
-    }
-
-    public List<String> getComputeLocations() {
-        return computeLocations;
-    }
-
-    public List<String> getStorageLocations() {
-        return storageLocations;
-    }
-
-    public void setCollectLocations(List<String> collectLocations) {
-        this.collectLocations = collectLocations;
-    }
-
-    public void setComputeLocations(List<String> computeLocations) {
-        this.computeLocations = computeLocations;
-    }
-
-    public void setStorageLocations(List<String> storageLocations) {
-        this.storageLocations = storageLocations;
-    }
-
-    public IFeedJoint getSourceFeedJoint() {
-        return sourceFeedJoint;
-    }
-
-    public IFeedJoint getComputeFeedJoint() {
-        return computeFeedJoint;
-    }
-
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
-        this.computeFeedJoint = computeFeedJoint;
-    }
-
-    public void partitionStart() {
-        partitionStarts++;
-    }
-
-    public boolean collectionStarted() {
-        return partitionStarts == collectLocations.size();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
new file mode 100644
index 0000000..0e931f7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.external.feed.management.FeedEventsListener;
+
+public class FeedEventSubscriber implements IActiveEventSubscriber {
+
+    private final FeedEventsListener listener;
+    private final ActivityState state;
+    private boolean done = false;
+
+    public FeedEventSubscriber(FeedEventsListener listener, ActivityState state) {
+        this.listener = listener;
+        this.state = state;
+
+    }
+
+    @Override
+    public synchronized void notify(ActiveEvent event) {
+        if (listener.getState() == state || listener.getState() == ActivityState.FAILED
+                || listener.getState() == ActivityState.STOPPED) {
+            done = true;
+            notifyAll();
+        }
+    }
+
+    @Override
+    public synchronized boolean done() {
+        return done;
+    }
+
+    @Override
+    public synchronized void sync() throws InterruptedException {
+        while (!done) {
+            wait();
+        }
+    }
+
+    @Override
+    public synchronized void unsubscribe() {
+        done = true;
+        notifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
deleted file mode 100644
index 4114e82..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.List;
-
-import org.apache.asterix.active.ActiveJob;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.util.FeedUtils.JobType;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedIntakeInfo extends ActiveJob {
-
-    private static final long serialVersionUID = 1L;
-    private final EntityId feedId;
-    private final IFeedJoint intakeFeedJoint;
-    private final JobSpecification spec;
-    private List<String> intakeLocation;
-
-    public FeedIntakeInfo(JobId jobId, ActivityState state, EntityId feedId, IFeedJoint intakeFeedJoint,
-            JobSpecification spec) {
-        super(feedId, jobId, state, JobType.INTAKE, spec);
-        this.feedId = feedId;
-        this.intakeFeedJoint = intakeFeedJoint;
-        this.spec = spec;
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-
-    public IFeedJoint getIntakeFeedJoint() {
-        return intakeFeedJoint;
-    }
-
-    @Override
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public List<String> getIntakeLocation() {
-        return intakeLocation;
-    }
-
-    public void setIntakeLocation(List<String> intakeLocation) {
-        this.intakeLocation = intakeLocation;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
new file mode 100644
index 0000000..9d8c570
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.IActiveEventSubscriber;
+
+/**
+ * An event subscriber that does not listen to any events
+ */
+public class NoOpSubscriber implements IActiveEventSubscriber {
+
+    public static final NoOpSubscriber INSTANCE = new NoOpSubscriber();
+
+    private NoOpSubscriber() {
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        // do nothing
+    }
+
+    @Override
+    public boolean done() {
+        return true;
+    }
+
+    @Override
+    public void sync() {
+        // do nothing
+    }
+
+    @Override
+    public void unsubscribe() {
+        // do nothing
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 570155c..4d8be98 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -48,6 +48,15 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<String>
     private Map<String, String> configuration;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
+    public static boolean isTwitterPull(Map<String, String> configuration) {
+        String reader = configuration.get(ExternalDataConstants.KEY_READER);
+        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
+                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 5337be1..6a581ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.input.stream.factory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
@@ -109,7 +110,8 @@ public class SocketServerInputStreamFactory implements IInputStreamFactory {
         try {
             Pair<String, Integer> socket = sockets.get(partition);
             ServerSocket server;
-            server = new ServerSocket(socket.second);
+            server = new ServerSocket();
+            server.bind(new InetSocketAddress(socket.second));
             return new SocketServerInputStream(server);
         } catch (IOException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 4ecb887..c4cb650 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -20,11 +20,6 @@ package org.apache.asterix.external.operators;
 
 import java.util.Map;
 
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.om.types.ARecordType;
@@ -55,21 +50,17 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     /** Map representation of policy parameters */
     private final Map<String, String> feedPolicyProperties;
 
-    /** The source feed from which the feed derives its data from. **/
-    private final EntityId sourceFeedId;
-
     /** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/
     private final FeedRuntimeType subscriptionLocation;
 
-    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
-            EntityId sourceFeedId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, ARecordType atype,
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
             FeedRuntimeType subscriptionLocation) {
-        super(spec, 0, 1);
+        super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
         this.connectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
-        this.sourceFeedId = sourceFeedId;
         this.subscriptionLocation = subscriptionLocation;
     }
 
@@ -77,11 +68,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        ActiveManager feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getActiveManager();
-        ActiveRuntimeId sourceRuntimeId = new ActiveRuntimeId(sourceFeedId, subscriptionLocation.toString(), partition);
-        ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getRuntime(sourceRuntimeId);
-        return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition, sourceRuntime);
+        return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition);
     }
 
     public FeedConnectionId getFeedConnectionId() {
@@ -100,10 +87,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
         return recordDescriptors[0];
     }
 
-    public EntityId getSourceFeedId() {
-        return sourceFeedId;
-    }
-
     public FeedRuntimeType getSubscriptionLocation() {
         return subscriptionLocation;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index d7fa590..384da84 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -18,46 +18,38 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /**
  * The first operator in a collect job in a feed.
  */
-public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+public class FeedCollectOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private final int partition;
     private final FeedConnectionId connectionId;
-    private final Map<String, String> feedPolicy;
     private final FeedPolicyAccessor policyAccessor;
     private final ActiveManager activeManager;
-    private final ISubscribableRuntime sourceRuntime;
     private final IHyracksTaskContext ctx;
-    private CollectionRuntime collectRuntime;
 
     public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicy, int partition, ISubscribableRuntime sourceRuntime) {
+            Map<String, String> feedPolicy, int partition) {
         this.ctx = ctx;
         this.partition = partition;
         this.connectionId = feedConnectionId;
-        this.sourceRuntime = sourceRuntime;
-        this.feedPolicy = feedPolicy;
         this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
         this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getActiveManager();
@@ -68,7 +60,6 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
         try {
             ActiveRuntimeId runtimeId =
                     new ActiveRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT.toString(), partition);
-            // Does this collector have a handler?
             FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
             if (policyAccessor.bufferingEnabled()) {
                 writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
@@ -76,17 +67,33 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
             } else {
                 writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
             }
-            collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
-                    new FeedFrameCollector(policyAccessor, writer, connectionId));
-            activeManager.registerRuntime(collectRuntime);
-            sourceRuntime.subscribe(collectRuntime);
-            // Notify CC that Collection started
-            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
-            collectRuntime.waitTillCollectionOver();
-            activeManager.deregisterRuntime(collectRuntime.getRuntimeId());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        writer.nextFrame(buffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
 }


Mime
View raw message