asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [25/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
Date Thu, 14 Jan 2016 20:32:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
new file mode 100644
index 0000000..b42ef1e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJobNotificationHandler.java
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.api.IFeedJoint.State;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+import org.apache.asterix.external.feed.management.FeedWorkManager;
+import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
+import org.apache.asterix.external.feed.watch.FeedJobInfo.JobType;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.feed.FeedLifecycleListener.Message;
+import org.apache.asterix.feed.FeedWorkCollection.SubscribeFeedWork;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+
+public class FeedJobNotificationHandler implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+    private final LinkedBlockingQueue<Message> inbox;
+    private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
+
+    private final Map<JobId, FeedJobInfo> jobInfos;
+    private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
+    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
+    private final Map<FeedId, List<IFeedJoint>> feedPipeline;
+    private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
+
+    public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
+        this.inbox = inbox;
+        this.jobInfos = new HashMap<JobId, FeedJobInfo>();
+        this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
+        this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
+        this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
+        this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
+        this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
+    }
+
+    @Override
+    public void run() {
+        Message mesg;
+        while (true) {
+            try {
+                mesg = inbox.take();
+                switch (mesg.messageKind) {
+                    case JOB_START:
+                        handleJobStartMessage(mesg);
+                        break;
+                    case JOB_FINISH:
+                        handleJobFinishMessage(mesg);
+                        break;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
+            IIntakeProgressTracker feedIntakeProgressTracker) {
+        if (feedIntakeProgressTrackers.get(connectionId) == null) {
+            this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
+                    feedIntakeProgressTracker, 0L));
+        } else {
+            throw new IllegalStateException(" Progress tracker for connection " + connectionId
+                    + " is alreader registered");
+        }
+    }
+
+    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
+        this.feedIntakeProgressTrackers.remove(connectionId);
+    }
+
+    public void updateTrackingInformation(StorageReportFeedMessage srm) {
+        Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
+        if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
+            p.second = srm.getLastPersistedTupleIntakeTimestamp();
+            p.first.notifyIngestedTupleTimestamp(p.second);
+        }
+    }
+
+    public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
+        return intakeJobInfos.values();
+    }
+
+    public Collection<FeedConnectJobInfo> getFeedConnectInfos() {
+        return connectJobInfos.values();
+    }
+
+    public void registerFeedJoint(IFeedJoint feedJoint) {
+        List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
+        if (feedJointsOnPipeline == null) {
+            feedJointsOnPipeline = new ArrayList<IFeedJoint>();
+            feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
+            feedJointsOnPipeline.add(feedJoint);
+        } else {
+            if (!feedJointsOnPipeline.contains(feedJoint)) {
+                feedJointsOnPipeline.add(feedJoint);
+            } else {
+                throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
+            }
+        }
+    }
+
+    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
+        if (jobInfos.get(jobId) != null) {
+            throw new IllegalStateException("Feed job already registered");
+        }
+
+        List<IFeedJoint> joints = feedPipeline.get(feedId);
+        IFeedJoint intakeJoint = null;
+        for (IFeedJoint joint : joints) {
+            if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
+                intakeJoint = joint;
+                break;
+            }
+        }
+
+        if (intakeJoint != null) {
+            FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
+                    feedId, intakeJoint, jobSpec);
+            intakeJobInfos.put(feedId, intakeJobInfo);
+            jobInfos.put(jobId, intakeJobInfo);
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
+            }
+        } else {
+            throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed  "
+                    + feedId);
+        }
+    }
+
+    public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
+            JobSpecification jobSpec, Map<String, String> feedPolicy) {
+        if (jobInfos.get(jobId) != null) {
+            throw new IllegalStateException("Feed job already registered");
+        }
+
+        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
+        FeedConnectionId cid = null;
+        IFeedJoint sourceFeedJoint = null;
+        for (IFeedJoint joint : feedJoints) {
+            cid = joint.getReceiver(connectionId);
+            if (cid != null) {
+                sourceFeedJoint = joint;
+                break;
+            }
+        }
+
+        if (cid != null) {
+            FeedConnectJobInfo cInfo = new FeedConnectJobInfo(jobId, FeedJobState.CREATED, connectionId,
+                    sourceFeedJoint, null, jobSpec, feedPolicy);
+            jobInfos.put(jobId, cInfo);
+            connectJobInfos.put(connectionId, cInfo);
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Could not register feed collection job [" + jobId + "]" + " for feed connection "
+                        + connectionId);
+            }
+        }
+
+    }
+
+    public void deregisterFeedIntakeJob(JobId jobId) {
+        if (jobInfos.get(jobId) == null) {
+            throw new IllegalStateException(" Feed Intake job not registered ");
+        }
+
+        FeedIntakeInfo info = (FeedIntakeInfo) jobInfos.get(jobId);
+        jobInfos.remove(jobId);
+        intakeJobInfos.remove(info.getFeedId());
+
+        if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
+            List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
+            joints.remove(info.getIntakeFeedJoint());
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Deregistered feed intake job [" + jobId + "]");
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Not removing feed joint as intake job is in " + FeedJobState.UNDER_RECOVERY + " state.");
+            }
+        }
+
+    }
+
+    private void handleJobStartMessage(Message message) throws Exception {
+        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+        switch (jobInfo.getJobType()) {
+            case INTAKE:
+                handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
+                break;
+            case FEED_CONNECT:
+                handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
+                break;
+        }
+
+    }
+
+    private void handleJobFinishMessage(Message message) throws Exception {
+        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+        switch (jobInfo.getJobType()) {
+            case INTAKE:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
+                }
+                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
+                break;
+            case FEED_CONNECT:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Collect Job finished for  " + (FeedConnectJobInfo) jobInfo);
+                }
+                handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
+                break;
+        }
+
+    }
+
+    private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
+        List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+                intakeOperatorIds.add(opDesc.getOperatorId());
+            }
+        }
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
+        List<String> intakeLocations = new ArrayList<String>();
+        for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
+            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
+            int nOperatorInstances = operatorLocations.size();
+            for (int i = 0; i < nOperatorInstances; i++) {
+                intakeLocations.add(operatorLocations.get(i));
+            }
+        }
+        // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
+        intakeJobInfo.setIntakeLocation(intakeLocations);
+        intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
+        intakeJobInfo.setState(FeedJobState.ACTIVE);
+
+        // notify event listeners 
+        notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+    }
+
+    private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
+        // set locations of feed sub-operations (intake, compute, store)
+        setLocations(cInfo);
+
+        // activate joints
+        List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+        for (IFeedJoint joint : joints) {
+            if (joint.getProvider().equals(cInfo.getConnectionId())) {
+                joint.setState(State.ACTIVE);
+                if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
+                    cInfo.setComputeFeedJoint(joint);
+                }
+            }
+        }
+        cInfo.setState(FeedJobState.ACTIVE);
+
+        // register activity in metadata
+        registerFeedActivity(cInfo);
+        // notify event listeners
+        notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
+    }
+
+    private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
+        JobType jobType = jobInfo.getJobType();
+        List<FeedConnectionId> impactedConnections = new ArrayList<FeedConnectionId>();
+        if (jobType.equals(JobType.INTAKE)) {
+            FeedId feedId = ((FeedIntakeInfo) jobInfo).getFeedId();
+            for (FeedConnectionId connId : eventSubscribers.keySet()) {
+                if (connId.getFeedId().equals(feedId)) {
+                    impactedConnections.add(connId);
+                }
+            }
+        } else {
+            impactedConnections.add(((FeedConnectJobInfo) jobInfo).getConnectionId());
+        }
+
+        for (FeedConnectionId connId : impactedConnections) {
+            List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connId);
+            if (subscribers != null && !subscribers.isEmpty()) {
+                for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
+                    subscriber.handleFeedEvent(event);
+                }
+            }
+        }
+    }
+
+    public synchronized void submitFeedConnectionRequest(IFeedJoint feedJoint, final FeedConnectionRequest request)
+            throws Exception {
+        List<String> locations = null;
+        switch (feedJoint.getType()) {
+            case INTAKE:
+                FeedIntakeInfo intakeInfo = intakeJobInfos.get(feedJoint.getOwnerFeedId());
+                locations = intakeInfo.getIntakeLocation();
+                break;
+            case COMPUTE:
+                FeedConnectionId connectionId = feedJoint.getProvider();
+                FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+                locations = cInfo.getComputeLocations();
+                break;
+        }
+
+        SubscribeFeedWork work = new SubscribeFeedWork(locations.toArray(new String[] {}), request);
+        FeedWorkManager.INSTANCE.submitWork(work, new SubscribeFeedWork.FeedSubscribeWorkEventListener());
+    }
+
+    public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
+        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+        if (cInfo != null) {
+            return cInfo.getSourceFeedJoint();
+        }
+        return null;
+    }
+
+    public Set<FeedConnectionId> getActiveFeedConnections() {
+        Set<FeedConnectionId> activeConnections = new HashSet<FeedConnectionId>();
+        for (FeedConnectJobInfo cInfo : connectJobInfos.values()) {
+            if (cInfo.getState().equals(FeedJobState.ACTIVE)) {
+                activeConnections.add(cInfo.getConnectionId());
+            }
+        }
+        return activeConnections;
+    }
+
+    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+        if (cInfo != null) {
+            return cInfo.getState().equals(FeedJobState.ACTIVE);
+        }
+        return false;
+    }
+
+    public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
+        FeedConnectJobInfo connectJobInfo = connectJobInfos.get(connectionId);
+        connectJobInfo.setState(jobState);
+    }
+
+    public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getState();
+    }
+
+    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(message.jobId);
+        JobStatus status = info.getStatus();
+        FeedLifecycleEvent event;
+        event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+                : FeedLifecycleEvent.FEED_ENDED;
+
+        // remove feed joints
+        deregisterFeedIntakeJob(message.jobId);
+
+        // notify event listeners 
+        notifyFeedEventSubscribers(intakeInfo, event);
+
+    }
+
+    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+        FeedConnectionId connectionId = cInfo.getConnectionId();
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        JobStatus status = info.getStatus();
+        boolean failure = status != null && status.equals(JobStatus.FAILURE);
+        FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
+
+        boolean removeJobHistory = !failure;
+        boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
+                || (failure && fpa.continueOnHardwareFailure());
+
+        if (!retainSubsription) {
+            IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
+            feedJoint.removeReceiver(connectionId);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+            }
+            removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
+        }
+
+        if (removeJobHistory) {
+            connectJobInfos.remove(connectionId);
+            jobInfos.remove(cInfo.getJobId());
+            feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
+        }
+        deregisterFeedActivity(cInfo);
+
+        // notify event listeners 
+        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
+        notifyFeedEventSubscribers(cInfo, event);
+    }
+
+    private void registerFeedActivity(FeedConnectJobInfo cInfo) {
+        Map<String, String> feedActivityDetails = new HashMap<String, String>();
+
+        if (cInfo.getCollectLocations() != null) {
+            feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
+                    StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
+        }
+
+        if (cInfo.getComputeLocations() != null) {
+            feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
+                    StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
+        }
+
+        if (cInfo.getStorageLocations() != null) {
+            feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
+                    StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
+        }
+
+        String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
+
+        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
+        try {
+            FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
+                    .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
+                    feedActivityDetails);
+            CentralFeedManager.getInstance().getFeedLoadManager()
+                    .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
+            }
+
+        }
+
+    }
+
+    public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
+        try {
+            CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
+            }
+        }
+    }
+
+    public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
+        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
+
+        IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
+        List<FeedConnectionId> all = sourceJoint.getReceivers();
+        boolean removeSourceJoint = all.size() < 2;
+        if (removeSourceJoint) {
+            feedJoints.remove(sourceJoint);
+        }
+
+        IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
+        if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
+            feedJoints.remove(computeJoint);
+        }
+    }
+
+    public boolean isRegisteredFeedJob(JobId jobId) {
+        return jobInfos.get(jobId) != null;
+    }
+
+    public List<String> getFeedComputeLocations(FeedId feedId) {
+        List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
+        for (IFeedJoint joint : feedJoints) {
+            if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
+                return connectJobInfos.get(joint.getProvider()).getComputeLocations();
+            }
+        }
+        return null;
+    }
+
+    public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getStorageLocations();
+    }
+
+    public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getCollectLocations();
+    }
+
+    public List<String> getFeedIntakeLocations(FeedId feedId) {
+        return intakeJobInfos.get(feedId).getIntakeLocation();
+    }
+
+    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getJobId();
+    }
+
+    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+        List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
+        if (subscribers == null) {
+            subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
+            eventSubscribers.put(connectionId, subscribers);
+        }
+        subscribers.add(subscriber);
+    }
+
+    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+        List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
+        if (subscribers != null) {
+            subscribers.remove(subscriber);
+        }
+    }
+
+    //============================
+
+    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
+        List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
+        if (joints != null && !joints.isEmpty()) {
+            for (IFeedJoint joint : joints) {
+                if (joint.getFeedJointKey().equals(feedJointKey)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public Collection<IFeedJoint> getFeedIntakeJoints() {
+        List<IFeedJoint> intakeFeedPoints = new ArrayList<IFeedJoint>();
+        for (FeedIntakeInfo info : intakeJobInfos.values()) {
+            intakeFeedPoints.add(info.getIntakeFeedJoint());
+        }
+        return intakeFeedPoints;
+    }
+
+    public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
+        List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
+        if (joints != null && !joints.isEmpty()) {
+            for (IFeedJoint joint : joints) {
+                if (joint.getFeedJointKey().equals(feedPointKey)) {
+                    return joint;
+                }
+            }
+        }
+        return null;
+    }
+
+    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
+        if (feedJoint != null) {
+            return feedJoint;
+        } else {
+            String jointKeyString = feedJointKey.getStringRep();
+            List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
+            IFeedJoint candidateJoint = null;
+            if (jointsOnPipeline != null) {
+                for (IFeedJoint joint : jointsOnPipeline) {
+                    if (jointKeyString.contains(joint.getFeedJointKey().getStringRep())) {
+                        if (candidateJoint == null) {
+                            candidateJoint = joint;
+                        } else if (joint.getFeedJointKey().getStringRep()
+                                .contains(candidateJoint.getFeedJointKey().getStringRep())) { // found feed point is a super set of the earlier find
+                            candidateJoint = joint;
+                        }
+                    }
+                }
+            }
+            return candidateJoint;
+        }
+    }
+
+    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getSpec();
+    }
+
+    public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
+        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
+        for (IFeedJoint joint : joints) {
+            if (joint.getType().equals(type)) {
+                return joint;
+            }
+        }
+        return null;
+    }
+
+    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId);
+    }
+
+    private void setLocations(FeedConnectJobInfo cInfo) {
+        JobSpecification jobSpec = cInfo.getSpec();
+
+        List<OperatorDescriptorId> collectOperatorIds = new ArrayList<OperatorDescriptorId>();
+        List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+        List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
+
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            IOperatorDescriptor actualOp = null;
+            if (opDesc instanceof FeedMetaOperatorDescriptor) {
+                actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+            } else {
+                actualOp = opDesc;
+            }
+
+            if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+                AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
+                IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
+                boolean computeOp = false;
+                for (IPushRuntimeFactory rf : runtimeFactories) {
+                    if (rf instanceof AssignRuntimeFactory) {
+                        IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
+                        IOperatorDescriptor sourceOp = jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId())
+                                .getLeft().getLeft();
+                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+                            computeOp = true;
+                            break;
+                        }
+                    }
+                }
+                if (computeOp) {
+                    computeOperatorIds.add(entry.getKey());
+                }
+            } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                storageOperatorIds.add(entry.getKey());
+            } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
+                collectOperatorIds.add(entry.getKey());
+            }
+        }
+
+        try {
+            IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+            JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+            List<String> collectLocations = new ArrayList<String>();
+            for (OperatorDescriptorId collectOpId : collectOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
+                int nOperatorInstances = operatorLocations.size();
+                for (int i = 0; i < nOperatorInstances; i++) {
+                    collectLocations.add(operatorLocations.get(i));
+                }
+            }
+
+            List<String> computeLocations = new ArrayList<String>();
+            for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+                if (operatorLocations != null) {
+                    int nOperatorInstances = operatorLocations.size();
+                    for (int i = 0; i < nOperatorInstances; i++) {
+                        computeLocations.add(operatorLocations.get(i));
+                    }
+                } else {
+                    computeLocations.clear();
+                    computeLocations.addAll(collectLocations);
+                }
+            }
+
+            List<String> storageLocations = new ArrayList<String>();
+            for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+                if (operatorLocations == null) {
+                    continue;
+                }
+                int nOperatorInstances = operatorLocations.size();
+                for (int i = 0; i < nOperatorInstances; i++) {
+                    storageLocations.add(operatorLocations.get(i));
+                }
+            }
+            cInfo.setCollectLocations(collectLocations);
+            cInfo.setComputeLocations(computeLocations);
+            cInfo.setStorageLocations(storageLocations);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
new file mode 100644
index 0000000..43f227d
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedJoint.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+
+public class FeedJoint implements IFeedJoint {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJoint.class.getName());
+
+    /** A unique key associated with the feed point **/
+    private final FeedJointKey key;
+
+    /** The state associated with the FeedJoint **/
+    private State state;
+
+    /** A list of subscribers that receive data from this FeedJoint **/
+    private final List<FeedConnectionId> receivers;
+
+    /** The feedId on which the feedPoint resides **/
+    private final FeedId ownerFeedId;
+
+    /** A list of feed subscription requests submitted for subscribing to the FeedPoint's data **/
+    private final List<FeedConnectionRequest> connectionRequests;
+
+    private final ConnectionLocation connectionLocation;
+
+    private final FeedJointType type;
+
+    private FeedConnectionId provider;
+
+    public FeedJoint(FeedJointKey key, FeedId ownerFeedId, ConnectionLocation subscriptionLocation, FeedJointType type,
+            FeedConnectionId provider) {
+        this.key = key;
+        this.ownerFeedId = ownerFeedId;
+        this.type = type;
+        this.receivers = new ArrayList<FeedConnectionId>();
+        this.state = State.CREATED;
+        this.connectionLocation = subscriptionLocation;
+        this.connectionRequests = new ArrayList<FeedConnectionRequest>();
+        this.provider = provider;
+    }
+
+    @Override
+    public int hashCode() {
+        return key.hashCode();
+    }
+
+    public void addReceiver(FeedConnectionId connectionId) {
+        receivers.add(connectionId);
+    }
+
+    public void removeReceiver(FeedConnectionId connectionId) {
+        receivers.remove(connectionId);
+    }
+
+    public synchronized void addConnectionRequest(FeedConnectionRequest request) {
+        connectionRequests.add(request);
+        if (state.equals(State.ACTIVE)) {
+            handlePendingConnectionRequest();
+        }
+    }
+
+    public synchronized void setState(State state) {
+        if (this.state.equals(state)) {
+            return;
+        }
+        this.state = state;
+        if (this.state.equals(State.ACTIVE)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
+            }
+            handlePendingConnectionRequest();
+        }
+    }
+
+    private void handlePendingConnectionRequest() {
+        for (FeedConnectionRequest connectionRequest : connectionRequests) {
+            FeedConnectionId connectionId = new FeedConnectionId(connectionRequest.getReceivingFeedId(),
+                    connectionRequest.getTargetDataset());
+            try {
+                FeedLifecycleListener.INSTANCE.submitFeedConnectionRequest(this, connectionRequest);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Submitted feed connection request " + connectionRequest + " at feed joint " + this);
+                }
+                addReceiver(connectionId);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unsuccessful attempt at submitting connection request " + connectionRequest
+                            + " at feed joint " + this + ". Message " + e.getMessage());
+                }
+                e.printStackTrace();
+            }
+        }
+        connectionRequests.clear();
+    }
+
+    public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
+        for (FeedConnectionId cid : receivers) {
+            if (cid.equals(connectionId)) {
+                return cid;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return key.toString() + " [" + connectionLocation + "]" + "[" + state + "]";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null) {
+            return false;
+        }
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof FeedJoint)) {
+            return false;
+        }
+        return ((FeedJoint) o).getFeedJointKey().equals(this.key);
+    }
+
+    public FeedId getOwnerFeedId() {
+        return ownerFeedId;
+    }
+
+    public List<FeedConnectionRequest> getConnectionRequests() {
+        return connectionRequests;
+    }
+
+    public ConnectionLocation getConnectionLocation() {
+        return connectionLocation;
+    }
+
+    public FeedJointType getType() {
+        return type;
+    }
+
+    @Override
+    public FeedConnectionId getProvider() {
+        return provider;
+    }
+
+    public List<FeedConnectionId> getReceivers() {
+        return receivers;
+    }
+
+    public FeedJointKey getKey() {
+        return key;
+    }
+
+    public synchronized State getState() {
+        return state;
+    }
+
+    @Override
+    public FeedJointKey getFeedJointKey() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
new file mode 100644
index 0000000..aac3675
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLifecycleListener.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.api.common.SessionConfig.OutputFormat;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.management.FeedCollectInfo;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedJointKey;
+import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.cluster.AddNodeWork;
+import org.apache.asterix.metadata.cluster.ClusterManager;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * A listener that subscribes to events associated with cluster membership
+ * (nodes joining/leaving the cluster) and job lifecycle (start/end of a job).
+ * Subscription to such events allows keeping track of feed ingestion jobs and
+ * take any corrective action that may be required when a node involved in a
+ * feed leaves the cluster.
+ */
+public class FeedLifecycleListener implements IFeedLifecycleListener {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
+
+    public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+
+    private final LinkedBlockingQueue<Message> jobEventInbox;
+    private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+    private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
+    private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
+    private final FeedJobNotificationHandler feedJobNotificationHandler;
+    private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
+    private final ExecutorService executorService;
+
+    private ClusterState state;
+
+    private FeedLifecycleListener() {
+        this.jobEventInbox = new LinkedBlockingQueue<Message>();
+        this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
+        this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
+        this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
+        this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
+        this.executorService = Executors.newCachedThreadPool();
+        this.executorService.execute(feedJobNotificationHandler);
+        this.executorService.execute(feedWorkRequestResponseHandler);
+        ClusterManager.INSTANCE.registerSubscriber(this);
+        this.state = AsterixClusterProperties.INSTANCE.getState();
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+        if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
+            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId) throws HyracksException {
+        if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
+            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+            }
+        }
+    }
+
+    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+        return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
+    }
+
+    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
+            IIntakeProgressTracker feedIntakeProgressTracker) {
+        feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
+    }
+
+    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
+        feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
+    }
+
+    public void updateTrackingInformation(StorageReportFeedMessage srm) {
+        feedJobNotificationHandler.updateTrackingInformation(srm);
+    }
+
+    /*
+     * Traverse job specification to categorize job as a feed intake job or a feed collection job
+     */
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        JobSpecification spec = acggf.getJobSpecification();
+        FeedConnectionId feedConnectionId = null;
+        Map<String, String> feedPolicy = null;
+        for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+            if (opDesc instanceof FeedCollectOperatorDescriptor) {
+                feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
+                feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
+                feedJobNotificationHandler.registerFeedCollectionJob(
+                        ((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(), feedConnectionId, jobId, spec,
+                        feedPolicy);
+                break;
+            } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+                feedJobNotificationHandler.registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(),
+                        jobId, spec);
+                break;
+            }
+        }
+    }
+
+    public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
+        feedJobNotificationHandler.setJobState(connectionId, jobState);
+    }
+
+    public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
+        return feedJobNotificationHandler.getFeedJobState(connectionId);
+    }
+
+    public static class Message {
+        public JobId jobId;
+
+        public enum MessageKind {
+            JOB_START,
+            JOB_FINISH
+        }
+
+        public MessageKind messageKind;
+
+        public Message(JobId jobId, MessageKind msgKind) {
+            this.jobId = jobId;
+            this.messageKind = msgKind;
+        }
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+        Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
+
+        Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
+        Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
+
+        Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
+
+        for (String deadNode : deadNodeIds) {
+            for (FeedIntakeInfo intakeInfo : intakeInfos) {
+                if (intakeInfo.getIntakeLocation().contains(deadNode)) {
+                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
+                    if (infos == null) {
+                        infos = new ArrayList<FeedJobInfo>();
+                        impactedJobs.put(deadNode, infos);
+                    }
+                    infos.add(intakeInfo);
+                    intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
+                }
+            }
+
+            for (FeedConnectJobInfo connectInfo : connectJobInfos) {
+                if (connectInfo.getStorageLocations().contains(deadNode)) {
+                    continue;
+                }
+                if (connectInfo.getComputeLocations().contains(deadNode)
+                        || connectInfo.getCollectLocations().contains(deadNode)) {
+                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
+                    if (infos == null) {
+                        infos = new ArrayList<FeedJobInfo>();
+                        impactedJobs.put(deadNode, infos);
+                    }
+                    infos.add(connectInfo);
+                    connectInfo.setState(FeedJobState.UNDER_RECOVERY);
+                    feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
+                }
+            }
+
+        }
+
+        if (impactedJobs.size() > 0) {
+            AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
+            feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
+            workToBeDone.add(addNodeWork);
+        }
+        return workToBeDone;
+
+    }
+
+    public static class FailureReport {
+
+        private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
+        private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
+
+        public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
+                List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
+            this.recoverableConnectJobs = recoverableSubscribers;
+            this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
+        }
+
+        public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
+            return recoverableConnectJobs;
+        }
+
+        public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
+            return recoverableIntakeFeedIds;
+        }
+
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
+        }
+
+        boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+        if (needToReActivateFeeds) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+            }
+            try {
+                FeedsActivator activator = new FeedsActivator();
+                (new Thread(activator)).start();
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Exception in resuming feeds" + e.getMessage());
+                }
+            }
+            state = newState;
+        } else {
+            List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
+            for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
+                List<String> requiredNodeIds = entry.getValue();
+                if (requiredNodeIds.contains(joinedNodeId)) {
+                    requiredNodeIds.remove(joinedNodeId);
+                    if (requiredNodeIds.isEmpty()) {
+                        feedsThatCanBeRevived.add(entry.getKey());
+                    }
+                }
+            }
+            if (!feedsThatCanBeRevived.isEmpty()) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
+                }
+                FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
+                (new Thread(activator)).start();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        try {
+            responseInbox.put(response);
+        } catch (InterruptedException e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Interrupted exception");
+            }
+        }
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        switch (newState) {
+            case ACTIVE:
+                if (previousState.equals(ClusterState.UNUSABLE)) {
+                    try {
+                        // TODO: Figure out why code was commented
+                        // FeedsActivator activator = new FeedsActivator();
+                        // (new Thread(activator)).start();
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Exception in resuming feeds" + e.getMessage());
+                        }
+                    }
+                }
+                break;
+            default:
+                break;
+        }
+
+    }
+
+    public static class FeedsDeActivator implements Runnable {
+
+        private List<FeedConnectJobInfo> failedConnectjobs;
+
+        public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
+            this.failedConnectjobs = failedConnectjobs;
+        }
+
+        @Override
+        public void run() {
+            for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
+                endFeed(failedConnectJob);
+            }
+        }
+
+        private void endFeed(FeedConnectJobInfo cInfo) {
+            MetadataTransactionContext ctx = null;
+            PrintWriter writer = new PrintWriter(System.out, true);
+            SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+
+            try {
+                ctx = MetadataManager.INSTANCE.beginTransaction();
+                FeedId feedId = cInfo.getConnectionId().getFeedId();
+                DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
+                        new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
+                List<Statement> statements = new ArrayList<Statement>();
+                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
+                statements.add(dataverseDecl);
+                statements.add(stmt);
+                QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                        QueryTranslator.ResultDelivery.SYNC);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
+                }
+                MetadataManager.INSTANCE.commitTransaction(ctx);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
+                            + e.getMessage());
+                }
+                e.printStackTrace();
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception e2) {
+                    e2.addSuppressed(e);
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
+                    }
+                }
+
+            }
+
+        }
+    }
+
+    public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
+            throws Exception {
+        feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
+    }
+
+    @Override
+    public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId) {
+        List<FeedConnectionId> connections = new ArrayList<FeedConnectionId>();
+        Collection<FeedConnectionId> activeConnections = feedJobNotificationHandler.getActiveFeedConnections();
+        if (feedId != null) {
+            for (FeedConnectionId connectionId : activeConnections) {
+                if (connectionId.getFeedId().equals(feedId)) {
+                    connections.add(connectionId);
+                }
+            }
+        } else {
+            connections.addAll(activeConnections);
+        }
+        return connections;
+    }
+
+    @Override
+    public List<String> getComputeLocations(FeedId feedId) {
+        return feedJobNotificationHandler.getFeedComputeLocations(feedId);
+    }
+
+    @Override
+    public List<String> getIntakeLocations(FeedId feedId) {
+        return feedJobNotificationHandler.getFeedIntakeLocations(feedId);
+    }
+
+    @Override
+    public List<String> getStoreLocations(FeedConnectionId feedConnectionId) {
+        return feedJobNotificationHandler.getFeedStorageLocations(feedConnectionId);
+    }
+
+    @Override
+    public List<String> getCollectLocations(FeedConnectionId feedConnectionId) {
+        return feedJobNotificationHandler.getFeedCollectLocations(feedConnectionId);
+    }
+
+    @Override
+    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+        return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
+    }
+
+    public void reportPartialDisconnection(FeedConnectionId connectionId) {
+        feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
+    }
+
+    public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+        feedReportQueue.put(feedId, queue);
+    }
+
+    public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
+        feedReportQueue.remove(feedId);
+    }
+
+    public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
+        return feedReportQueue.get(feedId);
+    }
+
+    @Override
+    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+        return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
+    }
+
+    @Override
+    public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
+        return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
+    }
+
+    public void registerFeedJoint(IFeedJoint feedJoint) {
+        feedJobNotificationHandler.registerFeedJoint(feedJoint);
+    }
+
+    public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
+        return feedJobNotificationHandler.getFeedJoint(feedJointKey);
+    }
+
+    @Override
+    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+        feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, subscriber);
+    }
+
+    @Override
+    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+        feedJobNotificationHandler.deregisterFeedEventSubscriber(connectionId, subscriber);
+
+    }
+
+    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+        return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
+    }
+
+    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+        return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
new file mode 100644
index 0000000..ee4da11
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedLoadManager.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.api.IFeedLoadManager;
+import org.apache.asterix.external.feed.api.IFeedTrackingManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedCongestionMessage;
+import org.apache.asterix.external.feed.message.FeedReportMessage;
+import org.apache.asterix.external.feed.message.PrepareStallMessage;
+import org.apache.asterix.external.feed.message.ScaleInReportMessage;
+import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.feed.watch.NodeLoadReport;
+import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
+import org.apache.asterix.file.FeedOperations;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedLoadManager implements IFeedLoadManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
+
+    private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
+    private final TreeSet<NodeLoadReport> nodeReports;
+    private final Map<FeedConnectionId, FeedActivity> feedActivities;
+    private final Map<String, Pair<Integer, Integer>> feedMetrics;
+
+    private FeedConnectionId lastModified;
+    private long lastModifiedTimestamp;
+
+    private static final int UNKNOWN = -1;
+
+    public FeedLoadManager() {
+        this.nodeReports = new TreeSet<NodeLoadReport>();
+        this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
+        this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
+    }
+
+    @Override
+    public void submitNodeLoadReport(NodeLoadReport report) {
+        nodeReports.remove(report);
+        nodeReports.add(report);
+    }
+
+    @Override
+    public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
+        FeedRuntimeId runtimeId = message.getRuntimeId();
+        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+        if (jobState == null
+                || (jobState.equals(FeedJobState.UNDER_RECOVERY))
+                || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
+                        - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ignoring congestion report from " + runtimeId);
+            }
+            return;
+        } else {
+            try {
+                FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+                int inflowRate = message.getInflowRate();
+                int outflowRate = message.getOutflowRate();
+                List<String> currentComputeLocations = new ArrayList<String>();
+                currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
+                        .getConnectionId().getFeedId()));
+                int computeCardinality = currentComputeLocations.size();
+                int requiredCardinality = (int) Math
+                        .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
+                int additionalComputeNodes = requiredCardinality - computeCardinality;
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
+                            + additionalComputeNodes);
+                }
+
+                List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
+
+                // Step 1) Alter the original feed job to adjust the cardinality
+                JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+                        .getConnectionId());
+                helperComputeNodes.addAll(currentComputeLocations);
+                List<String> newLocations = new ArrayList<String>();
+                newLocations.addAll(currentComputeLocations);
+                newLocations.addAll(helperComputeNodes);
+                FeedMetadataUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
+
+                // Step 2) send prepare to  stall message
+                gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
+
+                // Step 3) run the altered job specification 
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("New Job after adjusting to the workload " + jobSpec);
+                }
+
+                Thread.sleep(10000);
+                runJob(jobSpec, false);
+                lastModified = message.getConnectionId();
+                lastModifiedTimestamp = System.currentTimeMillis();
+
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
+                }
+                throw new AsterixException(e);
+            }
+        }
+    }
+
+    @Override
+    public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
+        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+        if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
+            }
+            return;
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Processing scale-in message " + message);
+            }
+            FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+            JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+                    .getConnectionId());
+            int reducedCardinality = message.getReducedCardinaliy();
+            List<String> currentComputeLocations = new ArrayList<String>();
+            currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
+                    .getFeedId()));
+            FeedMetadataUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
+                    currentComputeLocations);
+
+            gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
+            Thread.sleep(3000);
+            JobId newJobId = runJob(jobSpec, false);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
+            }
+
+        }
+    }
+
+    private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
+            throws Exception {
+        // Step 1) send prepare to  stall message
+        PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
+        List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+        List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+
+        Set<String> operatorLocations = new HashSet<String>();
+
+        operatorLocations.addAll(intakeLocations);
+        operatorLocations.addAll(computeLocations);
+        operatorLocations.addAll(storageLocations);
+
+        JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
+        runJob(messageJobSpec, true);
+
+        // Step 2)
+        TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
+        messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
+        runJob(messageJobSpec, true);
+    }
+
+    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobId jobId = hcc.startJob(spec);
+        if (waitForCompletion) {
+            hcc.waitForCompletion(jobId);
+        }
+        return jobId;
+    }
+
+    @Override
+    public void submitFeedRuntimeReport(FeedReportMessage report) {
+        String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
+        Pair<Integer, Integer> value = feedMetrics.get(key);
+        if (value == null) {
+            value = new Pair<Integer, Integer>(report.getValue(), 1);
+            feedMetrics.put(key, value);
+        } else {
+            value.first = value.first + report.getValue();
+            value.second = value.second + 1;
+        }
+    }
+
+    @Override
+    public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
+        int rVal;
+        String key = "" + connectionId + ":" + runtimeType;
+        feedMetrics.get(key);
+        Pair<Integer, Integer> value = feedMetrics.get(key);
+        if (value == null) {
+            rVal = UNKNOWN;
+        } else {
+            rVal = value.first / value.second;
+        }
+        return rVal;
+    }
+
+    private List<String> getNodeForSubstitution(int nRequired) {
+        List<String> nodeIds = new ArrayList<String>();
+        Iterator<NodeLoadReport> it = null;
+        int nAdded = 0;
+        while (nAdded < nRequired) {
+            it = nodeReports.iterator();
+            while (it.hasNext()) {
+                nodeIds.add(it.next().getNodeId());
+                nAdded++;
+            }
+        }
+        return nodeIds;
+    }
+
+    @Override
+    public synchronized List<String> getNodes(int required) {
+        Iterator<NodeLoadReport> it;
+        List<String> allocated = new ArrayList<String>();
+        while (allocated.size() < required) {
+            it = nodeReports.iterator();
+            while (it.hasNext() && allocated.size() < required) {
+                allocated.add(it.next().getNodeId());
+            }
+        }
+        return allocated;
+    }
+
+    @Override
+    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
+        System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
+        FeedConnectionId connectionId = mesg.getConnectionId();
+        List<String> destinationLocations = new ArrayList<String>();
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+
+        destinationLocations.addAll(storageLocations);
+        destinationLocations.addAll(collectLocations);
+        JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
+                destinationLocations);
+        runJob(messageJobSpec, true);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
+        }
+        IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
+        trackingManager.disableAcking(connectionId);
+    }
+
+    @Override
+    public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
+        feedActivities.put(connectionId, activity);
+    }
+
+    @Override
+    public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
+        return feedActivities.get(connectionId);
+    }
+
+    @Override
+    public Collection<FeedActivity> getFeedActivities() {
+        return feedActivities.values();
+    }
+
+    @Override
+    public void removeFeedActivity(FeedConnectionId connectionId) {
+        feedActivities.remove(connectionId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
new file mode 100644
index 0000000..66eca0c
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.util.logging.Level;
+
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.api.IFeedLoadManager;
+import org.apache.asterix.external.feed.api.IFeedTrackingManager;
+import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
+import org.apache.asterix.external.feed.message.FeedCongestionMessage;
+import org.apache.asterix.external.feed.message.FeedReportMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.asterix.external.feed.message.ScaleInReportMessage;
+import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.watch.NodeLoadReport;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.feed.CentralFeedManager.AQLExecutor;
+import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
+
+public class FeedMessageReceiver extends MessageReceiver<String> {
+
+    private static boolean initialized;
+
+    private final IFeedLoadManager feedLoadManager;
+    private final IFeedTrackingManager feedTrackingManager;
+
+    public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
+        this.feedLoadManager = centralFeedManager.getFeedLoadManager();
+        this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
+    }
+
+    @Override
+    public void processMessage(String message) throws Exception {
+        JSONObject obj = new JSONObject(message);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Received message " + obj);
+        }
+        MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
+        switch (messageType) {
+            case XAQL:
+                if (!initialized) {
+                    FeedBootstrap.setUpInitialArtifacts();
+                    initialized = true;
+                }
+                AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
+                break;
+            case CONGESTION:
+                feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
+                break;
+            case FEED_REPORT:
+                feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
+                break;
+            case NODE_REPORT:
+                feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
+                break;
+            case SCALE_IN_REQUEST:
+                feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
+                break;
+            case STORAGE_REPORT:
+                FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
+                break;
+            case COMMIT_ACK:
+                feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
+                break;
+            case THROTTLING_ENABLED:
+                feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
+            default:
+                break;
+        }
+
+    }
+}


Mime
View raw message