asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [9/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
Date Sun, 15 May 2016 19:04:02 GMT
Cleanup Feed CodeBase

In order to expedite cleaning feeds up and reaching a maintainable
state, we disabled the following:
1. Policies (At least once, throttling, discarding, elasticity).
2. Statistics Reporting.
3. Load management.
4. Feed re-activation upon System reboot.
Right now on master, none of these features work reliably.
We will re-introduce them one feature at a time.

The rules followed in this change:
1. Keep X if X is tested in a test case.
2. Remove X if X is not used in test cases.

After a few meetings with Mike and Till, the policies
1. Buffering
2. Spill
3. Discard
4. Throttle
have been fixed and unit tests have been added.

Change-Id: I545bc4f8560564e4c868a80d27c77a4edd97a8b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/798
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/fba622b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/fba622b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/fba622b3

Branch: refs/heads/master
Commit: fba622b3ae6e5850e9110212e66c410ce4f00359
Parents: 0716dc0
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Sat May 14 13:01:16 2016 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Sun May 15 12:03:00 2016 -0700

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |   8 +-
 .../api/http/servlet/ConnectorAPIServlet.java   |   3 +-
 .../asterix/api/http/servlet/FeedServlet.java   |  39 +-
 .../api/http/servlet/FeedServletUtil.java       |  76 ---
 .../app/external/CentralFeedManager.java        | 110 ----
 .../external/FeedJobNotificationHandler.java    |  84 ---
 .../app/external/FeedLifecycleListener.java     | 282 ---------
 .../asterix/app/external/FeedLoadManager.java   | 300 ---------
 .../app/external/FeedMessageReceiver.java       |  96 ---
 .../asterix/app/external/FeedOperations.java    |  98 ---
 .../app/external/FeedTrackingManager.java       | 187 ------
 .../app/external/FeedWorkCollection.java        |  93 +--
 .../FeedWorkRequestResponseHandler.java         | 269 ---------
 .../asterix/app/external/FeedsActivator.java    | 118 ----
 .../asterix/aql/translator/QueryTranslator.java |  12 +-
 .../bootstrap/CCApplicationEntryPoint.java      |   7 -
 .../bootstrap/ClusterLifecycleListener.java     |   8 +-
 .../hyracks/bootstrap/FeedBootstrap.java        |  62 --
 .../bootstrap/GlobalRecoveryManager.java        |   9 +-
 .../http/servlet/ConnectorAPIServletTest.java   |   3 +-
 .../src/test/resources/runtimets/testsuite.xml  |  44 +-
 .../common/exceptions/IExceptionHandler.java    |  43 ++
 asterixdb/asterix-external-data/pom.xml         |  41 +-
 .../external/api/IAdapterRuntimeManager.java    |  84 ---
 .../dataflow/FeedRecordDataFlowController.java  |   8 +-
 .../external/dataflow/FeedTupleForwarder.java   |  29 +-
 .../external/feed/api/ICentralFeedManager.java  |  34 --
 .../external/feed/api/IExceptionHandler.java    |  43 --
 .../feed/api/IFeedConnectionManager.java        |   2 +-
 .../external/feed/api/IFeedFrameHandler.java    |  39 --
 .../IFeedLifecycleIntakeEventSubscriber.java    |  28 -
 .../feed/api/IFeedLifecycleListener.java        |   3 +-
 .../external/feed/api/IFeedLoadManager.java     |  60 --
 .../asterix/external/feed/api/IFeedManager.java |  72 ---
 .../external/feed/api/IFeedMemoryComponent.java |  58 --
 .../external/feed/api/IFeedMemoryManager.java   |  58 --
 .../asterix/external/feed/api/IFeedMessage.java |  13 +-
 .../external/feed/api/IFeedMessageService.java  |  34 --
 .../external/feed/api/IFeedMetadataManager.java |  39 --
 .../external/feed/api/IFeedMetricCollector.java |  50 --
 .../api/IFeedOperatorOutputSideHandler.java     |  49 --
 .../external/feed/api/IFeedProvider.java        |  26 -
 .../asterix/external/feed/api/IFeedRuntime.java |  23 +-
 .../asterix/external/feed/api/IFeedService.java |  26 -
 .../feed/api/IFeedSubscriptionManager.java      |  41 --
 .../external/feed/api/IFeedTrackingManager.java |  29 -
 .../external/feed/api/IFrameEventCallback.java  |  32 -
 .../external/feed/api/IFramePostProcessor.java  |  28 -
 .../external/feed/api/IFramePreprocessor.java   |  26 -
 .../feed/api/IIntakeProgressTracker.java        |  29 -
 .../external/feed/api/IMessageReceiver.java     |  28 -
 .../external/feed/api/ISubscribableRuntime.java |  28 +-
 .../external/feed/api/ISubscriberRuntime.java   |   3 -
 .../feed/api/ISubscriptionProvider.java         |  29 -
 .../CollectTransformFeedFrameWriter.java        | 125 ----
 .../external/feed/dataflow/DataBucket.java      |  89 ---
 .../external/feed/dataflow/DataBucketPool.java  | 110 ----
 .../dataflow/DistributeFeedFrameWriter.java     |  64 +-
 .../FeedCollectRuntimeInputHandler.java         |  57 --
 .../feed/dataflow/FeedExceptionHandler.java     |  25 +-
 .../external/feed/dataflow/FeedFrameCache.java  | 177 ------
 .../feed/dataflow/FeedFrameCollector.java       |  96 +--
 .../feed/dataflow/FeedFrameDiscarder.java       |  67 --
 .../feed/dataflow/FeedFrameHandlers.java        | 309 ----------
 .../feed/dataflow/FeedFrameSpiller.java         | 188 ------
 .../feed/dataflow/FeedFrameTupleAccessor.java   | 110 ----
 .../feed/dataflow/FeedFrameTupleDecorator.java  | 108 ----
 .../feed/dataflow/FeedRuntimeInputHandler.java  | 605 ++++++++-----------
 .../external/feed/dataflow/FrameAction.java     |  54 ++
 .../external/feed/dataflow/FrameCollection.java | 101 ----
 .../feed/dataflow/FrameDistributor.java         | 422 ++++---------
 .../feed/dataflow/FrameEventCallback.java       | 103 ----
 .../external/feed/dataflow/FrameSpiller.java    | 217 +++++++
 .../feed/dataflow/StorageFrameHandler.java      | 119 ----
 .../dataflow/SyncFeedRuntimeInputHandler.java   |  72 +++
 .../feed/management/ConcurrentFramePool.java    | 204 +++++++
 .../feed/management/FeedConnectionId.java       |   9 +-
 .../feed/management/FeedConnectionManager.java  |   4 +-
 .../external/feed/management/FeedManager.java   |  84 +--
 .../feed/management/FeedMemoryManager.java      | 114 ----
 .../feed/management/FeedMetadataManager.java    | 112 ----
 .../management/FeedSubscriptionManager.java     |  76 ---
 .../feed/message/FeedCongestionMessage.java     | 102 ----
 .../feed/message/FeedMessageService.java        | 145 -----
 .../feed/message/FeedReportMessage.java         |  99 ---
 .../feed/message/FeedTupleCommitAckMessage.java |  98 ---
 .../message/FeedTupleCommitResponseMessage.java |  81 ---
 .../external/feed/message/MessageListener.java  | 126 ----
 .../external/feed/message/MessageReceiver.java  | 119 ----
 .../feed/message/NodeReportMessage.java         |  68 ---
 .../feed/message/PrepareStallMessage.java       |  68 ---
 .../message/RemoteSocketMessageListener.java    | 134 ----
 .../feed/message/ScaleInReportMessage.java      | 113 ----
 .../feed/message/SocketMessageListener.java     | 160 -----
 .../feed/message/StorageReportFeedMessage.java  | 128 ----
 .../feed/message/TerminateDataFlowMessage.java  |  52 --
 .../message/ThrottlingEnabledFeedMessage.java   |  85 ---
 .../external/feed/message/XAQLFeedMessage.java  |  66 --
 .../external/feed/runtime/AdapterExecutor.java  |  11 +-
 .../feed/runtime/AdapterRuntimeManager.java     |  63 +-
 .../feed/runtime/CollectionRuntime.java         |  26 +-
 .../external/feed/runtime/FeedRuntime.java      |  38 +-
 .../external/feed/runtime/FeedRuntimeId.java    |  28 +-
 .../external/feed/runtime/IngestionRuntime.java |  60 +-
 .../feed/runtime/SubscribableFeedRuntimeId.java |  53 --
 .../feed/runtime/SubscribableRuntime.java       |  45 +-
 .../feed/watch/BasicMonitoredBuffer.java        |  80 ---
 .../feed/watch/ComputeSideMonitoredBuffer.java  |  79 ---
 .../feed/watch/FeedMetricCollector.java         | 189 ------
 .../feed/watch/IntakePartitionStatistics.java   |  41 --
 .../feed/watch/IntakeSideMonitoredBuffer.java   |  80 ---
 .../external/feed/watch/MonitoredBuffer.java    | 401 ------------
 .../feed/watch/MonitoredBufferTimerTasks.java   | 299 ---------
 .../asterix/external/feed/watch/NodeLoad.java   |  62 --
 .../external/feed/watch/NodeLoadReport.java     | 100 ---
 .../feed/watch/NodeLoadReportService.java       | 107 ----
 .../asterix/external/feed/watch/Series.java     |  44 --
 .../asterix/external/feed/watch/SeriesAvg.java  |  47 --
 .../asterix/external/feed/watch/SeriesRate.java |  92 ---
 .../feed/watch/StorageSideMonitoredBuffer.java  | 209 -------
 .../input/stream/SocketServerInputStream.java   |   4 +-
 .../FeedCollectOperatorDescriptor.java          |  38 +-
 .../FeedCollectOperatorNodePushable.java        | 160 +----
 .../operators/FeedIntakeOperatorDescriptor.java |  23 +-
 .../FeedIntakeOperatorNodePushable.java         | 200 ++----
 .../FeedMessageOperatorNodePushable.java        | 187 +-----
 .../operators/FeedMetaComputeNodePushable.java  | 134 ++--
 .../operators/FeedMetaNodePushable.java         | 189 ------
 .../operators/FeedMetaOperatorDescriptor.java   |  11 -
 .../operators/FeedMetaStoreNodePushable.java    | 134 ++--
 .../util/ExternalDataExceptionUtils.java        |  18 +-
 .../feed/test/FeedMemoryManagerUnitTest.java    | 482 +++++++++++++++
 .../external/feed/test/FeedSpillerUnitTest.java | 178 ++++++
 .../metadata/declared/AqlMetadataProvider.java  | 358 +++++------
 .../metadata/feeds/FeedMetadataUtil.java        |  75 +--
 .../apache/hyracks/api/comm/IFrameWriter.java   |  25 +-
 .../api/context/IHyracksFrameMgrContext.java    |  15 +-
 .../api/context/IHyracksTaskContext.java        |   2 +
 .../org/apache/hyracks/control/nc/Task.java     |   5 +
 .../common/io/MessagingFrameTupleAppender.java  |  17 +-
 .../std/connectors/PartitionDataWriter.java     |   8 +-
 .../hyracks/test/support/TestTaskContext.java   |   7 +-
 142 files changed, 2318 insertions(+), 10614 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 8342be5..5b3e453 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -51,7 +51,6 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
@@ -120,7 +119,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
-    private IFeedManager feedManager;
+    private FeedManager feedManager;
 
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
@@ -173,7 +172,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
                 ioManager, ncApplicationContext.getNodeId(), metadataProperties);
 
-        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
+        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
+                .createRepository();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
                 this);
@@ -377,7 +377,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     }
 
     @Override
-    public IFeedManager getFeedManager() {
+    public FeedManager getFeedManager() {
         return feedManager;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index 79ce721..c70950a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -29,7 +29,6 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.app.external.CentralFeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -84,7 +83,7 @@ public class ConnectorAPIServlet extends HttpServlet {
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
 
             // Retrieves file splits of the dataset.
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null, CentralFeedManager.getInstance());
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
             if (dataset == null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index eacee6d..8bc613a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -32,9 +32,6 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.app.external.CentralFeedManager;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.watch.FeedActivity;
@@ -86,52 +83,26 @@ public class FeedServlet extends HttpServlet {
         }
 
         String outStr = null;
-        if (requestURI.startsWith("/webui/static")) {
-            outStr = sb.toString();
-        } else {
-            Collection<FeedActivity> lfa = CentralFeedManager.getInstance().getFeedLoadManager().getFeedActivities();
-            StringBuilder ldStr = new StringBuilder();
-            ldStr.append("<br />");
-            ldStr.append("<br />");
-            if (lfa == null || lfa.isEmpty()) {
-                ldStr.append("Currently there are no active feeds in AsterixDB");
-            } else {
-                ldStr.append("Active Feeds");
-            }
-            insertTable(ldStr, lfa);
-            outStr = String.format(sb.toString(), ldStr.toString());
-
-        }
+        outStr = sb.toString();
 
         PrintWriter out = response.getWriter();
         out.println(outStr);
     }
 
+    @SuppressWarnings("unused")
     private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
-        html.append("<table style=\"width:100%\">");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_FEED_NAME + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_DATASET_NAME + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_ACTIVE_SINCE + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_STAGE + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_COMPUTE_STAGE + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_STAGE + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_RATE + "</th>");
-        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_RATE + "</th>");
-        for (FeedActivity activity : list) {
-            insertRow(html, activity);
-        }
     }
 
+    @SuppressWarnings("null")
     private void insertRow(StringBuilder html, FeedActivity activity) {
         String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
         String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
         String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
 
-        IFeedLoadManager loadManager = CentralFeedManager.getInstance().getFeedLoadManager();
         FeedConnectionId connectionId = new FeedConnectionId(
                 new FeedId(activity.getDataverseName(), activity.getFeedName()), activity.getDatasetName());
-        int intakeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.COLLECT) * intake.split(",").length;
-        int storeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.STORE) * store.split(",").length;
+        int intakeRate = 0;
+        int storeRate = 0;
 
         html.append("<tr>");
         html.append("<td>" + activity.getFeedName() + "</td>");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
deleted file mode 100644
index 52a140d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServletUtil.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.servlet;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.app.external.FeedLifecycleListener;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.RemoteSocketMessageListener;
-
-public class FeedServletUtil {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedServletUtil.class.getName());
-    private static final char EOL = (char) "\n".getBytes()[0];
-
-    public static final class Constants {
-        public static final String TABLE_HEADER_FEED_NAME = "Feed";
-        public static final String TABLE_HEADER_DATASET_NAME = "Dataset";
-        public static final String TABLE_HEADER_ACTIVE_SINCE = "Timestamp";
-        public static final String TABLE_HEADER_INTAKE_STAGE = "Intake Stage";
-        public static final String TABLE_HEADER_COMPUTE_STAGE = "Compute Stage";
-        public static final String TABLE_HEADER_STORE_STAGE = "Store Stage";
-        public static final String TABLE_HEADER_INTAKE_RATE = "Intake";
-        public static final String TABLE_HEADER_STORE_RATE = "Store";
-    }
-
-    public static void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
-        LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
-        int subscriptionPort = port + 1;
-        Socket sc = new Socket(host, subscriptionPort);
-        InputStream in = sc.getInputStream();
-
-        CharBuffer buffer = CharBuffer.allocate(50);
-        char ch = 0;
-        while (ch != EOL) {
-            buffer.put(ch);
-            ch = (char) in.read();
-        }
-        buffer.flip();
-        sc.close();
-
-        String s = new String(buffer.array());
-        int feedSubscriptionPort = Integer.parseInt(s.trim());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Response from Super Feed Manager Report Service " + port + " will connect at " + host + " "
-                    + port);
-        }
-
-        // register the feed subscription queue with FeedLifecycleListener
-        FeedLifecycleListener.INSTANCE.registerFeedReportQueue(feedId, outbox);
-        RemoteSocketMessageListener listener = new RemoteSocketMessageListener(host, feedSubscriptionPort, outbox);
-        listener.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
deleted file mode 100644
index cab5e64..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/CentralFeedManager.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringReader;
-import java.util.List;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.ICentralFeedManager;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.message.SocketMessageListener;
-import org.apache.asterix.lang.aql.parser.AQLParserFactory;
-import org.apache.asterix.lang.common.base.IParser;
-import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class CentralFeedManager implements ICentralFeedManager {
-
-    private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    public static ICentralFeedManager getInstance() {
-        return centralFeedManager;
-    }
-
-    private final int port;
-    private final IFeedLoadManager feedLoadManager;
-    private final IFeedTrackingManager feedTrackingManager;
-    private final SocketMessageListener messageListener;
-
-    private CentralFeedManager() {
-        this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
-        this.feedLoadManager = new FeedLoadManager();
-        this.feedTrackingManager = new FeedTrackingManager();
-        this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
-    }
-
-    @Override
-    public void start() throws AsterixException {
-        messageListener.start();
-    }
-
-    @Override
-    public void stop() throws AsterixException, IOException {
-        messageListener.stop();
-    }
-
-    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobId jobId = hcc.startJob(spec);
-        if (waitForCompletion) {
-            hcc.waitForCompletion(jobId);
-        }
-        return jobId;
-    }
-
-    @Override
-    public IFeedLoadManager getFeedLoadManager() {
-        return feedLoadManager;
-    }
-
-    @Override
-    public IFeedTrackingManager getFeedTrackingManager() {
-        return feedTrackingManager;
-    }
-
-    public static class AQLExecutor {
-
-        private static final PrintWriter out = new PrintWriter(System.out, true);
-        private static final IParserFactory parserFactory = new AQLParserFactory();
-
-        public static void executeAQL(String aql) throws Exception {
-            IParser parser = parserFactory.createParser(new StringReader(aql));
-            List<Statement> statements = parser.parse();
-            SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
-            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    QueryTranslator.ResultDelivery.SYNC);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index a143578..cfc2125 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -21,7 +21,6 @@ package org.apache.asterix.app.external;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,15 +39,12 @@ import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.State;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.management.FeedJointKey;
 import org.apache.asterix.external.feed.management.FeedWorkManager;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.feed.watch.FeedJobInfo;
@@ -57,9 +53,7 @@ import org.apache.asterix.external.feed.watch.FeedJobInfo.JobType;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -85,7 +79,6 @@ public class FeedJobNotificationHandler implements Runnable {
     private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
     private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
     private final Map<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
-    private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
 
     public FeedJobNotificationHandler(LinkedBlockingQueue<FeedEvent> inbox) {
         this.inbox = inbox;
@@ -94,7 +87,6 @@ public class FeedJobNotificationHandler implements Runnable {
         this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
         this.feedPipeline = new HashMap<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>>();
         this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
-        this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
     }
 
     @Override
@@ -124,29 +116,6 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
-            IIntakeProgressTracker feedIntakeProgressTracker) {
-        if (feedIntakeProgressTrackers.get(connectionId) == null) {
-            this.feedIntakeProgressTrackers.put(connectionId,
-                    new Pair<IIntakeProgressTracker, Long>(feedIntakeProgressTracker, 0L));
-        } else {
-            throw new IllegalStateException(
-                    " Progress tracker for connection " + connectionId + " is alreader registered");
-        }
-    }
-
-    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
-        this.feedIntakeProgressTrackers.remove(connectionId);
-    }
-
-    public void updateTrackingInformation(StorageReportFeedMessage srm) {
-        Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
-        if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
-            p.second = srm.getLastPersistedTupleIntakeTimestamp();
-            p.first.notifyIngestedTupleTimestamp(p.second);
-        }
-    }
-
     public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
         return intakeJobInfos.values();
     }
@@ -358,8 +327,6 @@ public class FeedJobNotificationHandler implements Runnable {
             }
         }
         cInfo.setState(FeedJobState.ACTIVE);
-        // register activity in metadata
-        registerFeedActivity(cInfo);
     }
 
     private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
@@ -489,63 +456,12 @@ public class FeedJobNotificationHandler implements Runnable {
 
         connectJobInfos.remove(connectionId);
         jobInfos.remove(cInfo.getJobId());
-        feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
-        deregisterFeedActivity(cInfo);
         // notify event listeners
         FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE
                 : FeedLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(cInfo, event);
     }
 
-    private void registerFeedActivity(FeedConnectJobInfo cInfo) {
-        Map<String, String> feedActivityDetails = new HashMap<String, String>();
-
-        if (cInfo.getCollectLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
-                    StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
-        }
-
-        if (cInfo.getComputeLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
-                    StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
-        }
-
-        if (cInfo.getStorageLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
-                    StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
-        }
-
-        String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
-
-        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
-        try {
-            FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(),
-                    cInfo.getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
-                    feedActivityDetails);
-            CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(cInfo.getConnectionId(),
-                    feedActivity);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
-            }
-
-        }
-
-    }
-
-    public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
-        try {
-            CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
-            }
-        }
-    }
-
     public boolean isRegisteredFeedJob(JobId jobId) {
         return jobInfos.get(jobId) != null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index 161c863..b8435af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -18,56 +18,27 @@
  */
 package org.apache.asterix.app.external;
 
-import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.api.IFeedLifecycleListener;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
-import org.apache.asterix.external.feed.management.FeedCollectInfo;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
 import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -84,31 +55,17 @@ import org.apache.hyracks.api.job.JobSpecification;
 public class FeedLifecycleListener implements IFeedLifecycleListener {
 
     private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
-
     public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
     private final LinkedBlockingQueue<FeedEvent> jobEventInbox;
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
-    private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
-    private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
     private final FeedJobNotificationHandler feedJobNotificationHandler;
-    private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
     private final ExecutorService executorService;
 
-    private ClusterState state;
-
     private FeedLifecycleListener() {
         this.jobEventInbox = new LinkedBlockingQueue<FeedEvent>();
         this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
-        this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
-        this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
-        this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
         this.executorService = Executors.newCachedThreadPool();
         this.executorService.execute(feedJobNotificationHandler);
-        this.executorService.execute(feedWorkRequestResponseHandler);
-        ClusterManager.INSTANCE.registerSubscriber(this);
-        this.state = AsterixClusterProperties.INSTANCE.getState();
     }
 
     @Override
@@ -133,19 +90,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
         return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
     }
 
-    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
-            IIntakeProgressTracker feedIntakeProgressTracker) {
-        feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
-    }
-
-    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
-        feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
-    }
-
-    public void updateTrackingInformation(StorageReportFeedMessage srm) {
-        feedJobNotificationHandler.updateTrackingInformation(srm);
-    }
-
     /*
      * Traverse job specification to categorize job as a feed intake job or a feed collection job
      */
@@ -201,209 +145,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
         }
     }
 
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
-
-        Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
-        Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
-
-        Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
-
-        for (String deadNode : deadNodeIds) {
-            for (FeedIntakeInfo intakeInfo : intakeInfos) {
-                if (intakeInfo.getIntakeLocation().contains(deadNode)) {
-                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
-                    if (infos == null) {
-                        infos = new ArrayList<FeedJobInfo>();
-                        impactedJobs.put(deadNode, infos);
-                    }
-                    infos.add(intakeInfo);
-                    intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
-                }
-            }
-
-            for (FeedConnectJobInfo connectInfo : connectJobInfos) {
-                if (connectInfo.getStorageLocations().contains(deadNode)) {
-                    continue;
-                }
-                if (connectInfo.getComputeLocations().contains(deadNode)
-                        || connectInfo.getCollectLocations().contains(deadNode)) {
-                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
-                    if (infos == null) {
-                        infos = new ArrayList<FeedJobInfo>();
-                        impactedJobs.put(deadNode, infos);
-                    }
-                    infos.add(connectInfo);
-                    connectInfo.setState(FeedJobState.UNDER_RECOVERY);
-                    feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
-                }
-            }
-
-        }
-
-        if (impactedJobs.size() > 0) {
-            AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
-            feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
-            workToBeDone.add(addNodeWork);
-        }
-        return workToBeDone;
-
-    }
-
-    public static class FailureReport {
-
-        private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
-        private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
-
-        public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
-                List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
-            this.recoverableConnectJobs = recoverableSubscribers;
-            this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
-        }
-
-        public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
-            return recoverableConnectJobs;
-        }
-
-        public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
-            return recoverableIntakeFeedIds;
-        }
-
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
-        }
-
-        boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
-        if (needToReActivateFeeds) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
-            }
-            try {
-                FeedsActivator activator = new FeedsActivator();
-                (new Thread(activator)).start();
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                }
-            }
-            state = newState;
-        } else {
-            List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
-            for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
-                List<String> requiredNodeIds = entry.getValue();
-                if (requiredNodeIds.contains(joinedNodeId)) {
-                    requiredNodeIds.remove(joinedNodeId);
-                    if (requiredNodeIds.isEmpty()) {
-                        feedsThatCanBeRevived.add(entry.getKey());
-                    }
-                }
-            }
-            if (!feedsThatCanBeRevived.isEmpty()) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
-                }
-                FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
-                (new Thread(activator)).start();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        try {
-            responseInbox.put(response);
-        } catch (InterruptedException e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Interrupted exception");
-            }
-        }
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        switch (newState) {
-            case ACTIVE:
-                if (previousState.equals(ClusterState.UNUSABLE)) {
-                    try {
-                        // TODO: Figure out why code was commented
-                        // FeedsActivator activator = new FeedsActivator();
-                        // (new Thread(activator)).start();
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                        }
-                    }
-                }
-                break;
-            default:
-                break;
-        }
-
-    }
-
-    public static class FeedsDeActivator implements Runnable {
-
-        private List<FeedConnectJobInfo> failedConnectjobs;
-
-        public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
-            this.failedConnectjobs = failedConnectjobs;
-        }
-
-        @Override
-        public void run() {
-            for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
-                endFeed(failedConnectJob);
-            }
-        }
-
-        private void endFeed(FeedConnectJobInfo cInfo) {
-            MetadataTransactionContext ctx = null;
-            PrintWriter writer = new PrintWriter(System.out, true);
-            SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-
-            try {
-                ctx = MetadataManager.INSTANCE.beginTransaction();
-                FeedId feedId = cInfo.getConnectionId().getFeedId();
-                DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
-                        new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
-                List<Statement> statements = new ArrayList<Statement>();
-                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
-                statements.add(dataverseDecl);
-                statements.add(stmt);
-                QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                        QueryTranslator.ResultDelivery.SYNC);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
-                }
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
-                            + e.getMessage());
-                }
-                e.printStackTrace();
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
-                    }
-                }
-
-            }
-
-        }
-    }
-
     public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
             throws Exception {
         feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
@@ -451,21 +192,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
         return feedJobNotificationHandler.isFeedConnectionActive(connectionId, eventSubscriber);
     }
 
-    public void reportPartialDisconnection(FeedConnectionId connectionId) {
-    }
-
-    public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.put(feedId, queue);
-    }
-
-    public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.remove(feedId);
-    }
-
-    public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
-        return feedReportQueue.get(feedId);
-    }
-
     @Override
     public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
         return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
@@ -495,14 +221,6 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
 
     }
 
-    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
-    }
-
-    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
-    }
-
     public synchronized void notifyPartitionStart(FeedId feedId, JobId jobId) {
         jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PARTITION_START, feedId));
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
deleted file mode 100644
index b6be1e7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLoadManager.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.feed.watch.FeedJobInfo.FeedJobState;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedLoadManager implements IFeedLoadManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
-
-    private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
-    private final TreeSet<NodeLoadReport> nodeReports;
-    private final Map<FeedConnectionId, FeedActivity> feedActivities;
-    private final Map<String, Pair<Integer, Integer>> feedMetrics;
-
-    private FeedConnectionId lastModified;
-    private long lastModifiedTimestamp;
-
-    private static final int UNKNOWN = -1;
-
-    public FeedLoadManager() {
-        this.nodeReports = new TreeSet<NodeLoadReport>();
-        this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
-        this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
-    }
-
-    @Override
-    public void submitNodeLoadReport(NodeLoadReport report) {
-        nodeReports.remove(report);
-        nodeReports.add(report);
-    }
-
-    @Override
-    public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
-        FeedRuntimeId runtimeId = message.getRuntimeId();
-        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
-        if (jobState == null
-                || (jobState.equals(FeedJobState.UNDER_RECOVERY))
-                || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
-                        - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ignoring congestion report from " + runtimeId);
-            }
-            return;
-        } else {
-            try {
-                FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
-                int inflowRate = message.getInflowRate();
-                int outflowRate = message.getOutflowRate();
-                List<String> currentComputeLocations = new ArrayList<String>();
-                currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
-                        .getConnectionId().getFeedId()));
-                int computeCardinality = currentComputeLocations.size();
-                int requiredCardinality = (int) Math
-                        .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
-                int additionalComputeNodes = requiredCardinality - computeCardinality;
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
-                            + additionalComputeNodes);
-                }
-
-                List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
-
-                // Step 1) Alter the original feed job to adjust the cardinality
-                JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
-                        .getConnectionId());
-                helperComputeNodes.addAll(currentComputeLocations);
-                List<String> newLocations = new ArrayList<String>();
-                newLocations.addAll(currentComputeLocations);
-                newLocations.addAll(helperComputeNodes);
-                FeedMetadataUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
-
-                // Step 2) send prepare to  stall message
-                gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
-
-                // Step 3) run the altered job specification
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("New Job after adjusting to the workload " + jobSpec);
-                }
-
-                Thread.sleep(10000);
-                runJob(jobSpec, false);
-                lastModified = message.getConnectionId();
-                lastModifiedTimestamp = System.currentTimeMillis();
-
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
-                }
-                throw new AsterixException(e);
-            }
-        }
-    }
-
-    @Override
-    public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
-        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
-        if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
-            }
-            return;
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Processing scale-in message " + message);
-            }
-            FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
-            JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
-                    .getConnectionId());
-            int reducedCardinality = message.getReducedCardinaliy();
-            List<String> currentComputeLocations = new ArrayList<String>();
-            currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
-                    .getFeedId()));
-            FeedMetadataUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
-                    currentComputeLocations);
-
-            gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
-            Thread.sleep(3000);
-            JobId newJobId = runJob(jobSpec, false);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
-            }
-
-        }
-    }
-
-    private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
-            throws Exception {
-        // Step 1) send prepare to  stall message
-        PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
-        List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-        List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-
-        Set<String> operatorLocations = new HashSet<String>();
-
-        operatorLocations.addAll(intakeLocations);
-        operatorLocations.addAll(computeLocations);
-        operatorLocations.addAll(storageLocations);
-
-        JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
-        runJob(messageJobSpec, true);
-
-        // Step 2)
-        TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
-        messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
-        runJob(messageJobSpec, true);
-    }
-
-    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobId jobId = hcc.startJob(spec);
-        if (waitForCompletion) {
-            hcc.waitForCompletion(jobId);
-        }
-        return jobId;
-    }
-
-    @Override
-    public void submitFeedRuntimeReport(FeedReportMessage report) {
-        String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
-        Pair<Integer, Integer> value = feedMetrics.get(key);
-        if (value == null) {
-            value = new Pair<Integer, Integer>(report.getValue(), 1);
-            feedMetrics.put(key, value);
-        } else {
-            value.first = value.first + report.getValue();
-            value.second = value.second + 1;
-        }
-    }
-
-    @Override
-    public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
-        int rVal;
-        String key = "" + connectionId + ":" + runtimeType;
-        Pair<Integer, Integer> value = feedMetrics.get(key);
-        if (value == null) {
-            rVal = UNKNOWN;
-        } else {
-            rVal = value.first / value.second;
-        }
-        return rVal;
-    }
-
-    private List<String> getNodeForSubstitution(int nRequired) {
-        List<String> nodeIds = new ArrayList<String>();
-        Iterator<NodeLoadReport> it = null;
-        int nAdded = 0;
-        while (nAdded < nRequired) {
-            it = nodeReports.iterator();
-            while (it.hasNext()) {
-                nodeIds.add(it.next().getNodeId());
-                nAdded++;
-            }
-        }
-        return nodeIds;
-    }
-
-    @Override
-    public synchronized List<String> getNodes(int required) {
-        Iterator<NodeLoadReport> it;
-        List<String> allocated = new ArrayList<String>();
-        while (allocated.size() < required) {
-            it = nodeReports.iterator();
-            while (it.hasNext() && allocated.size() < required) {
-                allocated.add(it.next().getNodeId());
-            }
-        }
-        return allocated;
-    }
-
-    @Override
-    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
-        System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
-        FeedConnectionId connectionId = mesg.getConnectionId();
-        List<String> destinationLocations = new ArrayList<String>();
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-
-        destinationLocations.addAll(storageLocations);
-        destinationLocations.addAll(collectLocations);
-        JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
-                destinationLocations);
-        runJob(messageJobSpec, true);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
-        }
-        IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
-        trackingManager.disableAcking(connectionId);
-    }
-
-    @Override
-    public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
-        feedActivities.put(connectionId, activity);
-    }
-
-    @Override
-    public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
-        return feedActivities.get(connectionId);
-    }
-
-    @Override
-    public Collection<FeedActivity> getFeedActivities() {
-        return feedActivities.values();
-    }
-
-    @Override
-    public void removeFeedActivity(FeedConnectionId connectionId) {
-        feedActivities.remove(connectionId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
deleted file mode 100644
index bff1a4d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedMessageReceiver.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.logging.Level;
-
-import org.apache.asterix.app.external.CentralFeedManager.AQLExecutor;
-import org.apache.asterix.external.feed.api.IFeedLoadManager;
-import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.message.FeedCongestionMessage;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.external.feed.watch.NodeLoadReport;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.json.JSONObject;
-
-public class FeedMessageReceiver extends MessageReceiver<String> {
-
-    private static boolean initialized;
-
-    private final IFeedLoadManager feedLoadManager;
-    private final IFeedTrackingManager feedTrackingManager;
-
-    public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
-        this.feedLoadManager = centralFeedManager.getFeedLoadManager();
-        this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
-    }
-
-    @Override
-    public void processMessage(String message) throws Exception {
-        JSONObject obj = new JSONObject(message);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message " + obj);
-        }
-        MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
-        switch (messageType) {
-            case XAQL:
-                if (!initialized) {
-                    FeedBootstrap.setUpInitialArtifacts();
-                    initialized = true;
-                }
-                AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
-                break;
-            case CONGESTION:
-                feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
-                break;
-            case FEED_REPORT:
-                feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
-                break;
-            case NODE_REPORT:
-                feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
-                break;
-            case SCALE_IN_REQUEST:
-                feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
-                break;
-            case STORAGE_REPORT:
-                FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
-                break;
-            case COMMIT_ACK:
-                feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
-                break;
-            case THROTTLING_ENABLED:
-                feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
-            default:
-                break;
-        }
-
-    }
-
-    @Override
-    public void emptyInbox() throws HyracksDataException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index d8f1893..2de0266 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -32,10 +32,6 @@ import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.external.feed.message.PrepareStallMessage;
-import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
-import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
@@ -92,29 +88,6 @@ public class FeedOperations {
         return new Pair<JobSpecification, IAdapterFactory>(spec, adapterFactory);
     }
 
-    public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
-            throws AsterixException, AlgebricksException {
-
-        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IOperatorDescriptor feedMessenger = null;
-        AlgebricksPartitionConstraint messengerPc = null;
-
-        List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
-                locations);
-
-        feedMessenger = p.first;
-        messengerPc = p.second;
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
-        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
-        spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
-        spec.addRoot(nullSink);
-
-        return spec;
-    }
-
     /**
      * Builds the job spec for sending message to an active feed to disconnect it from the
      * its source.
@@ -162,66 +135,6 @@ public class FeedOperations {
 
     }
 
-    public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
-            Collection<String> collectLocations) throws AsterixException {
-        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
-        try {
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
-                    messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
-            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
-        } catch (AlgebricksException ae) {
-            throw new AsterixException(ae);
-        }
-        return messageJobSpec;
-    }
-
-    public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
-            ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
-        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
-        try {
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
-                    messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
-            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
-        } catch (AlgebricksException ae) {
-            throw new AsterixException(ae);
-        }
-        return messageJobSpec;
-    }
-
-    public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
-            List<String> collectLocations) throws AsterixException {
-        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
-        try {
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
-                    messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
-            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
-        } catch (AlgebricksException ae) {
-            throw new AsterixException(ae);
-        }
-        return messageJobSpec;
-    }
-
-    public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
-            Collection<String> targetLocations) throws AsterixException {
-        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
-        try {
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
-                    messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
-            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
-        } catch (AlgebricksException ae) {
-            throw new AsterixException(ae);
-        }
-        return messageJobSpec;
-    }
-
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
-            JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
-        FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
-        IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
-                feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
-        return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
-    }
-
     private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
             JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
             Collection<String> locations) throws AlgebricksException {
@@ -232,17 +145,6 @@ public class FeedOperations {
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
     }
 
-    private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
-            AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
-                messengerPc);
-        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
-        messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
-        messageJobSpec.addRoot(nullSink);
-        return messageJobSpec;
-    }
-
     private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
             JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
             FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
deleted file mode 100644
index 29230c1..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedTrackingManager.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedTrackingManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedTrackingManager implements IFeedTrackingManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
-
-    private final BitSet allOnes;
-
-    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
-    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
-
-    public FeedTrackingManager() {
-        byte[] allOneBytes = new byte[128];
-        Arrays.fill(allOneBytes, (byte) 0xff);
-        allOnes = BitSet.valueOf(allOneBytes);
-        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
-        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
-    }
-
-    @Override
-    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
-        AckId ackId = getAckId(ackMessage);
-        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
-        if (acksForConnection == null) {
-            acksForConnection = new HashMap<AckId, BitSet>();
-            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
-            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
-        }
-        BitSet currentAcks = acksForConnection.get(ackId);
-        if (currentAcks == null) {
-            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
-            acksForConnection.put(ackId, currentAcks);
-        } else {
-            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
-        }
-        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
-            }
-            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
-            if (maxBaseAckedForConnection == null) {
-                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
-                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
-            }
-            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
-            if (maxBaseAckedValue == null) {
-                maxBaseAckedValue = ackMessage.getBase();
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
-                }
-            }
-
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
-            }
-        }
-    }
-
-    public synchronized void disableTracking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-    }
-
-    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
-        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-        String collectLocation = collectLocations.get(partition);
-        Set<String> messageDestinations = new HashSet<String>();
-        messageDestinations.add(collectLocation);
-        messageDestinations.addAll(storageLocations);
-        try {
-            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
-            CentralFeedManager.runJob(spec, false);
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
-            }
-        }
-    }
-
-    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
-        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
-    }
-
-    private static class AckId {
-        private FeedConnectionId connectionId;
-        private int intakePartition;
-        private int base;
-
-        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
-            this.connectionId = connectionId;
-            this.intakePartition = intakePartition;
-            this.base = base;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (!(o instanceof AckId)) {
-                return false;
-            }
-            AckId other = (AckId) o;
-            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
-                    && other.getBase() == base;
-        }
-
-        @Override
-        public String toString() {
-            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return toString().hashCode();
-        }
-
-        public FeedConnectionId getConnectionId() {
-            return connectionId;
-        }
-
-        public int getIntakePartition() {
-            return intakePartition;
-        }
-
-        public int getBase() {
-            return base;
-        }
-
-    }
-
-    @Override
-    public void disableAcking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Acking disabled for " + connectionId);
-        }
-    }
-
-}



Mime
View raw message