asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xikui Wang (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-1950][ING][API] REST API for ActiveEntity stats
Date Thu, 29 Jun 2017 04:04:26 GMT
Xikui Wang has submitted this change and it was merged.

Change subject: [ASTERIXDB-1950][ING][API] REST API for ActiveEntity stats
......................................................................


[ASTERIXDB-1950][ING][API] REST API for ActiveEntity stats

- user model changes: yes
  Added ActiveEntity stats REST API.
- storage format changes: no
- interface changes: yes
  Changed ActiveEntityEventsListener & EventSubscrber interfaces.
  Added getStats method to IActiveRuntime.

Details:
1. Added HttpAPI for active feed stats.
2. Replaced FeedEventSubscriber with WaitForStateSubscriber.
3. Added StatsSubscriber for monitoring stats request.
4. Moved the message related methods from FeedEventsListener to
   ActiveEntityEventsListener for possible reuses in other cases.

Change-Id: I46b48b52a1c9906510c5bdce778d1672845f75ca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1748
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
---
M asterixdb/asterix-active/pom.xml
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
R asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
41 files changed, 936 insertions(+), 331 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; Verified

Objections:
  Jenkins: Violations found



diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 950e4d6..3dd24b6 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -42,5 +42,13 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-dataflow-std</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index 2669990..1141912 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.active;
 
+import java.util.Objects;
+
 import org.apache.hyracks.api.job.JobId;
 
 public class ActiveEvent {
@@ -27,7 +29,8 @@
         JOB_STARTED,
         JOB_FINISHED,
         PARTITION_EVENT,
-        EXTENSION_EVENT
+        EXTENSION_EVENT,
+        STATS_UPDATED
     }
 
     private final JobId jobId;
@@ -64,6 +67,24 @@
 
     @Override
     public String toString() {
-        return "JobId:" + jobId + ", " + "EntityId:" + entityId + ", " + "Kind" + eventKind;
+        return "JobId:" + jobId + "," + "EntityId:" + entityId + ", " + "Kind" + eventKind;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof ActiveEvent)) {
+            return false;
+        }
+        if (this == o) {
+            return true;
+        }
+        ActiveEvent other = (ActiveEvent) o;
+        return Objects.equals(entityId, other.entityId) && Objects.equals(eventKind, other.eventKind) && Objects
+                .equals(eventObject, other.eventObject);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jobId, entityId, eventKind, eventObject);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 44d6dae..fcf2be9 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -28,11 +28,16 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveStatsResponse;
+import org.apache.asterix.active.message.StatsRequestMessage;
 import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.log4j.Logger;
 
 public class ActiveManager {
@@ -44,14 +49,16 @@
     private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
+    private final INCServiceContext serviceCtx;
     private volatile boolean shutdown;
 
-    public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize)
-            throws HyracksDataException {
+    public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize,
+            INCServiceContext serviceCtx) throws HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
         this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
         this.runtimes = new ConcurrentHashMap<>();
+        this.serviceCtx = serviceCtx;
     }
 
     public ConcurrentFramePool getFramePool() {
@@ -78,16 +85,44 @@
         return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
     }
 
-    public void submit(ActiveManagerMessage message) {
+    public void submit(ActiveManagerMessage message) throws HyracksDataException {
         switch (message.getKind()) {
             case ActiveManagerMessage.STOP_ACTIVITY:
                 stopRuntime(message);
                 break;
+            case ActiveManagerMessage.REQUEST_STATS:
+                requestStats((StatsRequestMessage) message);
+                break;
             default:
                 LOGGER.warn("Unknown message type received: " + message.getKind());
         }
     }
 
+    private void requestStats(StatsRequestMessage message) throws HyracksDataException {
+        try {
+            ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+            IActiveRuntime runtime = runtimes.get(runtimeId);
+            long reqId = message.getReqId();
+            if (runtime == null) {
+                LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId);
+                // Send a failure message
+                ((NodeControllerService) serviceCtx.getControllerService())
+                        .sendApplicationMessageToCC(
+                                JavaSerializationUtils
+                                        .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException(
+                                                ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))),
+                                null);
+                return;
+            }
+            String stats = runtime.getStats();
+            ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
+            ((NodeControllerService) serviceCtx.getControllerService())
+                    .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     public void shutdown() {
         LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
         Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
index f1f5876..882ed11 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.active;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public class ActiveRuntimeId implements Serializable {
 
@@ -27,13 +28,11 @@
     private final EntityId entityId;
     private final String runtimeName;
     private final int partition;
-    private final int hashCode;
 
     public ActiveRuntimeId(EntityId entityId, String runtimeName, int partition) {
         this.entityId = entityId;
         this.runtimeName = runtimeName;
         this.partition = partition;
-        this.hashCode = toString().hashCode();
     }
 
     @Override
@@ -56,7 +55,7 @@
 
     @Override
     public int hashCode() {
-        return hashCode;
+        return Objects.hash(entityId, runtimeName, partition);
     }
 
     public String getRuntimeName() {
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 98a6979..a7d7796 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,12 +26,13 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
         implements IActiveRuntime {
 
-    private final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
@@ -88,15 +89,15 @@
         try {
             // notify cc that runtime has been registered
             ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
+                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null);
             start();
         } catch (InterruptedException e) {
             LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e);
             Thread.currentThread().interrupt();
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } catch (Exception e) {
             LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e);
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } finally {
             synchronized (this) {
                 done = true;
@@ -111,10 +112,10 @@
         activeManager.deregisterRuntime(runtimeId);
         try {
             ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
+                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null);
         } catch (Exception e) {
             LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e);
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } finally {
             LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable");
         }
@@ -124,4 +125,9 @@
     public final IFrameWriter getInputFrameWriter(int index) {
         return null;
     }
+
+    @Override
+    public JobId getJobId() {
+        return ctx.getJobletContext().getJobId();
+    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index c8abb84..af8f5ca 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -20,6 +20,10 @@
 
 public enum ActivityState {
     /**
+     * The initial state of an activity.
+     */
+    CREATED,
+    /**
      * The starting state and a possible terminal state. Next state can only be {@code ActivityState.STARTING}
      */
     STOPPED,
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
index cdf702d..9e20e2f 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.active;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * A unique identifier for a data feed.
@@ -30,13 +31,11 @@
     private final String extensionName;
     private final String dataverse;
     private final String entityName;
-    private final int hashCode;
 
     public EntityId(String extentionName, String dataverse, String entityName) {
         this.extensionName = extentionName;
         this.dataverse = dataverse;
         this.entityName = entityName;
-        this.hashCode = toString().hashCode();
     }
 
     public String getDataverse() {
@@ -52,17 +51,17 @@
         if (o == null || !(o instanceof EntityId)) {
             return false;
         }
-        if (this == o || ((EntityId) o).getExtensionName().equals(extensionName)
-                && ((EntityId) o).getEntityName().equals(entityName)
-                && ((EntityId) o).getDataverse().equals(dataverse)) {
+        if (o == this) {
             return true;
         }
-        return false;
+        EntityId other = (EntityId) o;
+        return Objects.equals(other.dataverse, dataverse) && Objects.equals(other.entityName, entityName) &&
+                Objects.equals(other.extensionName, extensionName);
     }
 
     @Override
     public int hashCode() {
-        return hashCode;
+        return Objects.hash(dataverse, entityName, extensionName);
     }
 
     @Override
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ee8e776..4bc02f3 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -41,16 +41,6 @@
     ActivityState getState();
 
     /**
-     * get a subscriber that waits till state has been reached.
-     *
-     * @param state
-     *            the desired state
-     * @throws HyracksDataException
-     *             a failure happened while waiting for the state
-     */
-    IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException;
-
-    /**
      * @return the active entity id
      */
     EntityId getEntityId();
@@ -62,4 +52,31 @@
      */
     boolean isEntityUsingDataset(IDataset dataset);
 
+    /**
+     * subscribe to events. subscription ends when subscriber.done() returns true
+     *
+     * @param subscriber
+     * @throws HyracksDataException
+     */
+    void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException;
+
+    /**
+     * The most recent acquired stats for the active entity
+     *
+     * @return
+     */
+    String getStats();
+
+    /**
+     * @return The timestamp of the most recent acquired stats for the active entity
+     */
+    long getStatsTimeStamp();
+
+    /**
+     * refresh the stats
+     *
+     * @param timeout
+     * @throws HyracksDataException
+     */
+    void refreshStats(long timeout) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
index 7be5737..69f7f1c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.active;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * An active event subscriber that subscribe to events related to active entity
  */
@@ -25,18 +27,22 @@
 
     /**
      * Notify the subscriber of a new event
+     *
      * @param event
+     * @throws HyracksDataException
      */
-    void notify(ActiveEvent event);
+    void notify(ActiveEvent event) throws HyracksDataException;
 
     /**
      * Checkcs whether the subscriber is done receiving events
+     *
      * @return
      */
-    boolean done();
+    boolean isDone();
 
     /**
      * Wait until the terminal event has been received
+     *
      * @throws InterruptedException
      */
     void sync() throws InterruptedException;
@@ -45,4 +51,12 @@
      * Stop watching events
      */
     void unsubscribe();
+
+    /**
+     * callback upon successful subscription
+     *
+     * @param eventsListener
+     * @throws HyracksDataException
+     */
+    void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 528c220..f37b2e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.active;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 
 public interface IActiveRuntime {
 
@@ -34,4 +35,16 @@
      * @throws InterruptedException
      */
     void stop() throws HyracksDataException, InterruptedException;
+
+    /**
+     * @return the job id associated with this active runtime
+     */
+    JobId getJobId();
+
+    /**
+     * @return the runtime stats for monitoring purposes
+     */
+    default String getStats() {
+        return "\"Runtime stats is not available.\"";
+    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 231ec25..9772698 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -27,15 +27,14 @@
 
 public class ActiveManagerMessage implements INcAddressedMessage {
     public static final byte STOP_ACTIVITY = 0x00;
+    public static final byte REQUEST_STATS = 0x01;
 
     private static final long serialVersionUID = 1L;
     private final byte kind;
-    private final String src;
     private final Serializable payload;
 
-    public ActiveManagerMessage(byte kind, String src, Serializable payload) {
+    public ActiveManagerMessage(byte kind, Serializable payload) {
         this.kind = kind;
-        this.src = src;
         this.payload = payload;
     }
 
@@ -45,10 +44,6 @@
 
     public byte getKind() {
         return kind;
-    }
-
-    public String getSrc() {
-        return src;
     }
 
     @Override
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 335121a..9391044 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.active.message;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
@@ -29,18 +30,14 @@
 
 public class ActivePartitionMessage implements ICcAddressedMessage {
 
+    private static final long serialVersionUID = 1L;
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
     public static final byte GENERIC_EVENT = 0x02;
-    private static final long serialVersionUID = 1L;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
     private final Serializable payload;
     private final byte event;
-
-    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event) {
-        this(activeRuntimeId, jobId, event, null);
-    }
 
     public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) {
         this.activeRuntimeId = activeRuntimeId;
@@ -73,6 +70,24 @@
 
     @Override
     public String toString() {
-        return ActivePartitionMessage.class.getSimpleName();
+        return ActivePartitionMessage.class.getSimpleName() + event;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(activeRuntimeId, jobId, payload, event);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof ActivePartitionMessage)) {
+            return false;
+        }
+        if (this == o) {
+            return true;
+        }
+        ActivePartitionMessage other = (ActivePartitionMessage) o;
+        return Objects.equals(other.activeRuntimeId, activeRuntimeId) && Objects.equals(other.jobId, jobId) && Objects
+                .equals(other.payload, payload);
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
new file mode 100644
index 0000000..8738a06
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveStatsResponse implements ICcAddressedMessage, INcResponse {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String stats;
+    private final Exception failure;
+
+    public ActiveStatsResponse(long reqId, String stats, Exception failure) {
+        this.reqId = reqId;
+        this.stats = stats;
+        this.failure = failure;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setResult(MutablePair<ResponseState, Object> result) {
+        ResponseState responseState = result.getLeft();
+        if (failure != null) {
+            result.setLeft(ResponseState.FAILURE);
+            result.setRight(failure);
+            return;
+        }
+        switch (responseState) {
+            case UNINITIALIZED:
+                // First to arrive
+                result.setRight(new ArrayList<String>());
+                // No failure, change state to success
+                result.setLeft(ResponseState.SUCCESS);
+                // Fallthrough
+            case SUCCESS:
+                List<String> response = (List<String>) result.getRight();
+                response.add(stats);
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        broker.respond(reqId, this);
+    }
+
+}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
new file mode 100644
index 0000000..8fa5f19
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+
+public class StatsRequestMessage extends ActiveManagerMessage {
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+
+    public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
+        super(kind, payload);
+        this.reqId = reqId;
+    }
+
+    public long getReqId() {
+        return reqId;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
new file mode 100644
index 0000000..e02f09b
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.external.feed.watch.StatsSubscriber;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ActiveStatsApiServlet extends AbstractServlet {
+
+    private static final int DEFAULT_EXPIRE_TIME = 2000;
+    private final ActiveLifecycleListener activeLifecycleListener;
+
+    public ActiveStatsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) {
+        super(ctx, paths);
+        this.activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+    }
+
+    private JsonNode constructNode(ObjectMapper om, IActiveEntityEventsListener eventListener, long currentTime,
+            long ttl) throws InterruptedException, IOException {
+        long statsTimeStamp = eventListener.getStatsTimeStamp();
+        if (currentTime - statsTimeStamp > ttl) {
+            StatsSubscriber subscriber = new StatsSubscriber(eventListener);
+            // refresh
+            eventListener.refreshStats(5000);
+            subscriber.sync();
+        }
+        return om.readTree(eventListener.getStats());
+    }
+
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) throws Exception {
+        // Obtain all feed status
+        String localPath = localPath(request);
+        int expireTime;
+        IActiveEntityEventsListener[] listeners = activeLifecycleListener.getNotificationHandler().getEventListeners();
+        ObjectMapper om = new ObjectMapper();
+        om.enable(SerializationFeature.INDENT_OUTPUT);
+        ObjectNode resNode = om.createObjectNode();
+
+        if (localPath.length() == 0 || localPath.length() == 1) {
+            expireTime = DEFAULT_EXPIRE_TIME;
+        } else {
+            expireTime = Integer.valueOf(localPath.substring(1));
+        }
+        long currentTime = System.currentTimeMillis();
+        for (int iter1 = 0; iter1 < listeners.length; iter1++) {
+            resNode.putPOJO(listeners[iter1].getEntityId().toString(),
+                    constructNode(om, listeners[iter1], currentTime, expireTime));
+        }
+
+        // Construct Response
+        PrintWriter responseWriter = response.writer();
+        responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(resNode));
+        responseWriter.flush();
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 55b9adc..29bc95e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -200,7 +200,8 @@
         isShuttingdown = false;
 
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
-                activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
+                activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
+                this.ncServiceContext);
 
         if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
             String nodeId = getServiceContext().getNodeId();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8a7d757..79d660c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -73,10 +73,12 @@
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -937,7 +939,7 @@
             // #. add a new index with PendingAddOp
             index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
                     keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes,
-                    stmtCreateIndex.isEnforced(),false, MetadataUtil.PENDING_ADD_OP);
+                    stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
 
             // #. prepare to create the index artifact in NC.
@@ -1087,8 +1089,8 @@
     protected void validateIndexKeyFields(CreateIndexStatement stmtCreateIndex, List<Integer> keySourceIndicators,
             ARecordType aRecordType, ARecordType metaRecordType, List<List<String>> indexFields,
             List<IAType> indexFieldTypes) throws AlgebricksException {
-        ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators,
-                indexFieldTypes, stmtCreateIndex.getIndexType());
+        ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, indexFieldTypes,
+                stmtCreateIndex.getIndexType());
     }
 
     protected void handleCreateTypeStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
@@ -1192,11 +1194,11 @@
                             MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
-                                    dataset));
-                        } else {
                             jobsToExecute.add(
-                                    IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset));
+                                    ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, dataset));
+                        } else {
+                            jobsToExecute
+                                    .add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset));
                         }
                     }
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset);
@@ -1957,7 +1959,8 @@
             EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
             ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
             ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
-            FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(feedId);
+            ActiveEntityEventsListener listener =
+                    (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId);
             if (listener != null) {
                 throw new AlgebricksException("Feed " + feedId
                         + " is currently active and connected to the following dataset(s) \n" + listener.toString());
@@ -2028,7 +2031,8 @@
         DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory();
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
-        FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(entityId);
+        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler
+                .getActiveEntityListener(entityId);
         if (listener != null) {
             throw new AlgebricksException("Feed " + feedName + " is started already.");
         }
@@ -2047,11 +2051,11 @@
                             compilationProvider, storageComponentProvider, qtFactory, hcc);
 
             JobSpecification feedJob = jobInfo.getLeft();
-            listener = new FeedEventsListener(appCtx, entityId, datasets, jobInfo.getRight().getLocations());
+            listener = new ActiveEntityEventsListener(appCtx, entityId, datasets, jobInfo.getRight(),
+                    FeedIntakeOperatorNodePushable.class.getSimpleName());
             activeEventHandler.registerListener(listener);
-            IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
+            IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STARTED);
             feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
-
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
             // We will need to design general exception handling mechanism for feeds.
             JobUtils.runJob(hcc, feedJob,
@@ -2077,11 +2081,12 @@
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
         // Obtain runtime info from ActiveListener
-        FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getActiveEntityListener(feedId);
+        ActiveEntityEventsListener listener =
+                (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId);
         if (listener == null) {
             throw new AlgebricksException("Feed " + feedName + " is not started.");
         }
-        IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STOPPED);
+        IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STOPPED);
         // Transaction
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2090,8 +2095,8 @@
             // validate
             FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx);
             // Construct ActiveMessage
-            for (int i = 0; i < listener.getSources().length; i++) {
-                String intakeLocation = listener.getSources()[i];
+            for (int i = 0; i < listener.getLocations().getLocations().length; i++) {
+                String intakeLocation = listener.getLocations().getLocations()[i];
                 FeedOperations.SendStopMessageToNode(appCtx, feedId, intakeLocation, i);
             }
             eventSubscriber.sync();
@@ -2231,8 +2236,7 @@
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (Index index : indexes) {
                     if (index.isSecondaryIndex()) {
-                        jobsToExecute
-                                .add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider));
+                        jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider));
                     }
                 }
             } else {
@@ -2257,8 +2261,7 @@
     }
 
     protected void prepareCompactJobsForExternalDataset(List<Index> indexes, Dataset ds,
-            List<JobSpecification> jobsToExecute, MetadataProvider metadataProvider)
-            throws AlgebricksException {
+            List<JobSpecification> jobsToExecute, MetadataProvider metadataProvider) throws AlgebricksException {
         for (int j = 0; j < indexes.size(); j++) {
             jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), metadataProvider));
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 35e0466..cd9138a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.api.http.server.DdlApiServlet;
 import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
+import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.FullApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.QueryApiServlet;
@@ -233,6 +234,7 @@
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_NODE_DETAIL); // must not precede add of CLUSTER_STATE
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE
         addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
+        addServlet(jsonAPIServer, Servlets.ACTIVE_STATS);
         return jsonAPIServer;
     }
 
@@ -305,6 +307,8 @@
                 return new ClusterControllerDetailsApiServlet(ctx, paths);
             case Servlets.DIAGNOSTICS:
                 return new DiagnosticsApiServlet(ctx, paths, appCtx);
+            case Servlets.ACTIVE_STATS:
+                return new ActiveStatsApiServlet(ctx, paths, appCtx);
             default:
                 throw new IllegalStateException(String.valueOf(key));
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 5932aff..de2ca11 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -72,6 +72,7 @@
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
     }
 
+    @Override
     public long newRequestId() {
         return REQUEST_ID_GENERATOR.incrementAndGet();
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 6155450..09c4983 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -381,7 +381,7 @@
 
     public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation,
             Integer partition) throws Exception {
-        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "SRC",
+        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
                 new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
new file mode 100644
index 0000000..2dc1782
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.active;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveRuntime;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
+import org.apache.asterix.runtime.utils.CcApplicationContext;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveMessageTest {
+
+    protected boolean cleanUp = true;
+    private static String EXPECTED_STATS = "Mock stats";
+
+    @Before
+    public void setUp() throws Exception {
+        ExecutionTestUtil.setUp(cleanUp);
+    }
+
+    @Test
+    public void refreshStatsTest() throws HyracksException {
+        // Entities to be used
+        EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity");
+        ActiveRuntimeId activeRuntimeId =
+                new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0);
+        List<IDataset> datasetList = new ArrayList<>();
+        AlgebricksAbsolutePartitionConstraint partitionConstraint =
+                new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" });
+        String requestedStats;
+        CcApplicationContext appCtx =
+                (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+        ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler();
+        JobId jobId = new JobId(1);
+
+        // Mock ActiveRuntime
+        IActiveRuntime mockRuntime = Mockito.mock(IActiveRuntime.class);
+        Mockito.when(mockRuntime.getRuntimeId()).thenReturn(activeRuntimeId);
+        Mockito.when(mockRuntime.getStats()).thenReturn(EXPECTED_STATS);
+
+        // Mock JobSpecification
+        JobSpecification jobSpec = Mockito.mock(JobSpecification.class);
+        Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
+                .thenReturn(entityId);
+
+        // Add event listener
+        ActiveEntityEventsListener eventsListener =
+                new ActiveEntityEventsListener(appCtx, entityId, datasetList, partitionConstraint,
+                        FeedIntakeOperatorNodePushable.class.getSimpleName());
+        activeJobNotificationHandler.registerListener(eventsListener);
+
+        // Register mock runtime
+        NCAppRuntimeContext nc1AppCtx =
+                (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext();
+        nc1AppCtx.getActiveManager().registerRuntime(mockRuntime);
+
+        // Check init stats
+        requestedStats = eventsListener.getStats();
+        Assert.assertTrue(requestedStats.equals("N/A"));
+
+        // Update stats of not-started job
+        eventsListener.refreshStats(1000);
+        requestedStats = eventsListener.getStats();
+        Assert.assertTrue(requestedStats.equals("N/A"));
+
+        // Update stats of created/started job without joined partition
+        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
+        activeLifecycleListener.notifyJobStart(jobId);
+        eventsListener.refreshStats(1000);
+        requestedStats = eventsListener.getStats();
+        Assert.assertTrue(requestedStats.equals("N/A"));
+
+        // Fake partition message and notify eventListener
+        ActivePartitionMessage partitionMessage =
+                new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED,
+                        null);
+        eventsListener.notify(new ActiveEvent(jobId, ActiveEvent.Kind.PARTITION_EVENT, entityId, partitionMessage));
+        eventsListener.refreshStats(100000);
+        requestedStats = eventsListener.getStats();
+        Assert.assertTrue(requestedStats.contains(EXPECTED_STATS));
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            objectMapper.readTree(requestedStats);
+        } catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+        // Ask for runtime that is not registered
+        HyracksDataException expectedException = null;
+        nc1AppCtx.getActiveManager().deregisterRuntime(activeRuntimeId);
+        try {
+            eventsListener.refreshStats(100000);
+        } catch (HyracksDataException e) {
+            expectedException = e;
+        }
+        Assert.assertTrue(expectedException != null
+                && expectedException.getErrorCode() == ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME);
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 5deaf54..5b28d3f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -191,10 +191,12 @@
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081;
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082;
     public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083;
-    public static final int PROVIDER_STREAM_RECORD_READER_DUPLICATE_FORMAT_MAPPING = 3084;
+    public static final int CANNOT_WAIT_FOR_STATE = 3084;
     public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085;
     public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
     public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
+    public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088;
+    public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3089;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 69c0ca0..33a8ff3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -56,4 +56,9 @@
      * @param response
      */
     void respond(Long reqId, INcResponse response);
+
+    /**
+     * @return a new request id
+     */
+    long newRequestId();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index a8a7390..0f7ab4d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -41,6 +41,7 @@
     public static final String CLUSTER_STATE_NODE_DETAIL = "/admin/cluster/node/*";
     public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*";
     public static final String DIAGNOSTICS = "/admin/diagnostics";
+    public static final String ACTIVE_STATS = "/admin/active/*";
 
     private Servlets() {
     }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index c755e40..8045531 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -52,8 +52,8 @@
 16 = Storage metadata directory of %1$s in %2$s couldn't be created
 17 = Unknown external file pending operation %1$s
 18 = Cannot convert the %1$s type to the %2$s type.
-19 = Can't convert integer types. The source type should be one of %1$s.
-20 = Can't convert integer types. The target type should be one of %1$s.
+19 = Cannot convert integer types. The source type should be one of %1$s.
+20 = Cannot convert integer types. The target type should be one of %1$s.
 21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s
 22 = The accessed field is untyped, but should be typed
 23 = %1$ss passed before getting back the responses from NCs
@@ -76,7 +76,7 @@
 1020 = Cannot autogenerate a primary key for primary key of type %1$s. Autogenerated primary keys must be of type %2$s
 1021 = The primary key field \"%1$s\" cannot be nullable
 1022 = Field of type %1$s cannot be used as a primary key field
-1023 = Can't drop dataset %1$s since it is connected to active entity: %2$s
+1023 = Cannot drop dataset %1$s since it is connected to active entity: %2$s
 1024 = Identifier %1$s is not found in AQL+ meta-scope
 1025 = There is no such join type in AQL+
 1026 = The given function expression %1$s cannot utilize index
@@ -108,15 +108,15 @@
 3008 = Unable to ingest data
 3009 = Exception in get record type %1$s for feed
 3010 = Doesn't support Hive data with list of non-primitive types
-3011 = Can't get hive type for field of type %1$s
+3011 = Cannot get hive type for field of type %1$s
 3012 = Failed to get columns of record
-3013 = Can't deserialize Hive records with no closed columns
+3013 = Cannot deserialize Hive records with no closed columns
 3014 = Non-optional UNION type is not supported.
 3015 = Failed to get the type information for field %1$s
-3016 = can't parse null field
-3017 = can't parse hive list with null values
+3016 = Cannot parse null field
+3017 = Cannot parse hive list with null values
 3018 = Field %1$s of meta record is not an optional type so it cannot accept null value
-3019 = Can't get PK from record part
+3019 = Cannot get PK from record part
 3020 = This operation cannot be done when Feed %1$s is alive
 3021 = Malformed input stream
 3022 = Unknown data source type: %1$s
@@ -168,7 +168,7 @@
 3069 = Found COMMA before any list item
 3070 = Found COMMA while expecting a list item
 3071 = Found END_RECORD while expecting a list item
-3072 = Can't cast the %1$s type to the %2$s type
+3072 = Cannot cast the %1$s type to the %2$s type
 3073 = Missing deserializer method for constructor: %1$s
 3074 = This can not be an instance of %1$s
 3075 = Closed field %1$s has null value
@@ -178,12 +178,14 @@
 3079 = Cannot register runtime, active manager has been shutdown
 3080 = Unexpected feed datatype '%1$s'
 3081 = socket is not properly configured
-3082 = "Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s"
+3082 = Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s
 3083 = Duplicate feed adaptor name: %1$s
-3084 = Duplicate record reader format: %1$s
+3084 = Cannot wait for state %1$s. The only states that can be waited for are STARTED or STOPPED
 3085 = Unknown Adapter Name.
-3086 = Cannot find record reader %1$s with specified configuration.
+3086 = Cannot find record reader %1$s with specified configuration
 3087 = Cannot find function %1$s
+3088 = %1$s is not a valid runtime Id
+3089 = Cannot subscribe to events of a failed active entity
 
 # Lifecycle management errors
-4000 = Partition id %1$d for node %2$s already in use by node %3$s
\ No newline at end of file
+4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 213231b..bcf5e25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -61,4 +61,6 @@
     public abstract boolean stop() throws HyracksDataException;
 
     public abstract boolean handleException(Throwable th) throws HyracksDataException;
+
+    public abstract String getStats();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index d01859e..1e62159 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -39,6 +39,8 @@
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected static final long INTERVAL = 1000;
     protected boolean failed = false;
+    protected long incomingRecordsCount = 0;
+    protected long failedRecordsCount = 0;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
@@ -63,7 +65,10 @@
                     continue;
                 }
                 tb.reset();
-                parseAndForward(record);
+                incomingRecordsCount++;
+                if (!parseAndForward(record)) {
+                    failedRecordsCount++;
+                }
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
@@ -104,19 +109,20 @@
         }
     }
 
-    private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
+    private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
         try {
             dataParser.parse(record, tb.getDataOutput());
         } catch (Exception e) {
             LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
             feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
             // continue the outer loop
-            return;
+            return false;
         }
         tb.addFieldEndOffset();
         addMetaPart(tb, record);
         addPrimaryKeys(tb, record);
         tupleForwarder.addTuple(tb);
+        return true;
     }
 
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
@@ -187,4 +193,9 @@
     public IRecordDataParser<T> getParser() {
         return dataParser;
     }
+
+    public String getStats() {
+        return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": "
+                + failedRecordsCount + "}";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 0d72682..cad11cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -29,6 +29,7 @@
 
     private final IStreamDataParser dataParser;
     private final AsterixInputStream stream;
+    protected long incomingRecordsCount = 0;
 
     public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) {
@@ -48,6 +49,7 @@
                 }
                 tb.addFieldEndOffset();
                 tupleForwarder.addTuple(tb);
+                incomingRecordsCount++;
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
@@ -83,4 +85,8 @@
         }
         return handled;
     }
+
+    public String getStats() {
+        return "{\"incoming-records-number\": " + incomingRecordsCount + "}";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 8d80e6f..e6d81d3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -51,4 +51,8 @@
     public boolean resume() throws HyracksDataException {
         return controller.resume();
     }
+
+    public String getStats() {
+        return controller.getStats();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 11561b5..90a8852 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -76,12 +76,11 @@
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
-        this.spiller = fpa.spillToDiskOnCongestion()
-                ? new FrameSpiller(ctx,
-                        connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
-                                + runtimeId.getRuntimeName() + "_" + runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk())
-                : null;
+        this.spiller = fpa.spillToDiskOnCongestion() ?
+                new FrameSpiller(ctx,
+                        connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_" + runtimeId.getPartition(),
+                        fpa.getMaxSpillOnDisk()) :
+                null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
@@ -289,8 +288,8 @@
             while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
                 if (DEBUG) {
                     LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than "
-                            + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = "
-                            + spiller.usedBudget());
+                            + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = " + spiller
+                            .usedBudget());
                 }
                 spiller.wait();
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
index 365c3ce..cee6fa9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -18,21 +18,135 @@
  */
 package org.apache.asterix.external.feed.management;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 
-public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener {
+public class ActiveEntityEventsListener implements IActiveEntityEventsListener {
+
+    private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName());
+
+    enum RequestState {
+        INIT,
+        STARTED,
+        FINISHED
+    }
 
     // members
-    protected EntityId entityId;
-    protected List<IDataset> datasets;
     protected volatile ActivityState state;
     protected JobId jobId;
+    protected final List<IActiveEventSubscriber> subscribers = new ArrayList<>();
+    protected final ICcApplicationContext appCtx;
+    protected final EntityId entityId;
+    protected final List<IDataset> datasets;
+    protected final ActiveEvent statsUpdatedEvent;
+    protected long statsTimestamp;
+    protected String stats;
+    protected RequestState statsRequestState;
+    protected final String runtimeName;
+    protected final AlgebricksAbsolutePartitionConstraint locations;
+    protected int numRegistered;
+
+    public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId entityId, List<IDataset> datasets,
+            AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
+        this.appCtx = appCtx;
+        this.entityId = entityId;
+        this.datasets = datasets;
+        this.state = ActivityState.STOPPED;
+        this.statsTimestamp = Long.MIN_VALUE;
+        this.statsRequestState = RequestState.INIT;
+        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId);
+        this.stats = "N/A";
+        this.runtimeName = runtimeName;
+        this.locations = locations;
+        this.numRegistered = 0;
+    }
+
+    public synchronized void notify(ActiveEvent event) {
+        try {
+            LOGGER.finer("EventListener is notified.");
+            ActiveEvent.Kind eventKind = event.getEventKind();
+            switch (eventKind) {
+                case JOB_CREATED:
+                    state = ActivityState.CREATED;
+                    break;
+                case JOB_STARTED:
+                    start(event);
+                    break;
+                case JOB_FINISHED:
+                    finish();
+                    break;
+                case PARTITION_EVENT:
+                    handle((ActivePartitionMessage) event.getEventObject());
+                    break;
+                default:
+                    LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event);
+                    break;
+            }
+            notifySubscribers(event);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unhandled Exception", e);
+        }
+    }
+
+    protected synchronized void handle(ActivePartitionMessage message) {
+        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+            numRegistered++;
+            if (numRegistered == locations.getLocations().length) {
+                state = ActivityState.STARTED;
+            }
+        }
+    }
+
+    private void finish() throws Exception {
+        IHyracksClientConnection hcc = appCtx.getHcc();
+        JobStatus status = hcc.getJobStatus(jobId);
+        state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
+        ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
+        activeLcListener.getNotificationHandler().removeListener(this);
+    }
+
+    private void start(ActiveEvent event) {
+        this.jobId = event.getJobId();
+        state = ActivityState.STARTING;
+    }
+
+    @Override
+    public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException {
+        if (this.state == ActivityState.FAILED) {
+            throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
+        }
+        subscriber.subscribed(this);
+        if (!subscriber.isDone()) {
+            subscribers.add(subscriber);
+        }
+    }
 
     @Override
     public EntityId getEntityId() {
@@ -52,4 +166,80 @@
     public JobId getJobId() {
         return jobId;
     }
+
+    @Override
+    public String getStats() {
+        return stats;
+    }
+
+    @Override
+    public long getStatsTimeStamp() {
+        return statsTimestamp;
+    }
+
+    public String formatStats(List<String> responses) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("{" + "\"EntityId\": \"" + entityId + "\", ");
+        strBuilder.append("\"Stats\": [").append("\"" + responses.get(0) + "\"");
+        for (int i = 1; i < responses.size(); i++) {
+            strBuilder.append(", ").append("\"" + responses.get(i) + "\"");
+        }
+        strBuilder.append("]}");
+        return strBuilder.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void refreshStats(long timeout) throws HyracksDataException {
+        synchronized (this) {
+            if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) {
+                return;
+            } else {
+                statsRequestState = RequestState.STARTED;
+            }
+        }
+        ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        long reqId = messageBroker.newRequestId();
+        List<INcAddressedMessage> requests = new ArrayList<>();
+        List<String> ncs = Arrays.asList(locations.getLocations());
+        for (int i = 0; i < ncs.size(); i++) {
+            requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
+                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+        }
+        try {
+            List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+            stats = formatStats(responses);
+            statsTimestamp = System.currentTimeMillis();
+            notifySubscribers(statsUpdatedEvent);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        // Same as above
+        statsRequestState = RequestState.FINISHED;
+    }
+
+    protected synchronized void notifySubscribers(ActiveEvent event) {
+        notifyAll();
+        Iterator<IActiveEventSubscriber> it = subscribers.iterator();
+        while (it.hasNext()) {
+            IActiveEventSubscriber subscriber = it.next();
+            if (subscriber.isDone()) {
+                it.remove();
+            } else {
+                try {
+                    subscriber.notify(event);
+                } catch (HyracksDataException e) {
+                    LOGGER.log(Level.WARNING, "Failed to notify subscriber", e);
+                }
+                if (subscriber.isDone()) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public AlgebricksAbsolutePartitionConstraint getLocations() {
+        return locations;
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
deleted file mode 100644
index 6f3b667..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveLifecycleListener;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEventSubscriber;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.watch.FeedEventSubscriber;
-import org.apache.asterix.external.feed.watch.NoOpSubscriber;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobStatus;
-
-public class FeedEventsListener extends ActiveEntityEventsListener {
-    // constants
-    private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName());
-    // members
-    private final ICcApplicationContext appCtx;
-    private final String[] sources;
-    private final List<IActiveEventSubscriber> subscribers;
-    private int numRegistered;
-
-    public FeedEventsListener(ICcApplicationContext appCtx, EntityId entityId, List<IDataset> datasets,
-            String[] sources) {
-        this.appCtx = appCtx;
-        this.entityId = entityId;
-        this.datasets = datasets;
-        this.sources = sources;
-        subscribers = new ArrayList<>();
-        state = ActivityState.STOPPED;
-    }
-
-    @Override
-    public synchronized void notify(ActiveEvent event) {
-        try {
-            switch (event.getEventKind()) {
-                case JOB_STARTED:
-                    start(event);
-                    break;
-                case JOB_FINISHED:
-                    finish();
-                    break;
-                case PARTITION_EVENT:
-                    partition((ActivePartitionMessage) event.getEventObject());
-                    break;
-                default:
-                    LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event);
-                    break;
-            }
-            notifySubscribers(event);
-        } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Unhandled Exception", e);
-        }
-    }
-
-    private synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEventSubscriber subscriber = it.next();
-            if (subscriber.done()) {
-                it.remove();
-            } else {
-                subscriber.notify(event);
-                if (subscriber.done()) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    private void partition(ActivePartitionMessage message) {
-        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
-            numRegistered++;
-            if (numRegistered == getSources().length) {
-                state = ActivityState.STARTED;
-            }
-        }
-    }
-
-    private void finish() throws Exception {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        JobStatus status = hcc.getJobStatus(jobId);
-        state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
-        ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        activeLcListener.getNotificationHandler().removeListener(this);
-    }
-
-    private void start(ActiveEvent event) {
-        this.jobId = event.getJobId();
-        state = ActivityState.STARTING;
-    }
-
-    @Override
-    public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
-        if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
-            throw new HyracksDataException("Can only wait for STARTED or STOPPED state");
-        }
-        synchronized (this) {
-            if (this.state == ActivityState.FAILED) {
-                throw new HyracksDataException("Feed has failed");
-            } else if (this.state == state) {
-                return NoOpSubscriber.INSTANCE;
-            }
-            return doSubscribe(state);
-        }
-    }
-
-    // Called within synchronized block
-    private FeedEventSubscriber doSubscribe(ActivityState state) {
-        FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state);
-        subscribers.add(subscriber);
-        return subscriber;
-    }
-
-    public String[] getSources() {
-        return sources;
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index c71b8a2..e6ac265 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -34,6 +34,7 @@
     // increase or decrease at any time)
     private final FeedAdapter adapter; // The adapter
     private final AdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
+    private int restartCount = 0;
 
     public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, AdapterRuntimeManager adapterManager) {
         this.writer = writer;
@@ -81,8 +82,13 @@
                 LOGGER.error("Exception during feed ingestion ", e);
                 continueIngestion = adapter.handleException(e);
                 failedIngestion = !continueIngestion;
+                restartCount++;
             }
         }
         return failedIngestion;
     }
+
+    public String getStats() {
+        return "{\"adapter-stats\": " + adapter.getStats() + ", \"executor-restart-times\": " + restartCount + "}";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 6214d9f..1b5eeac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -137,4 +137,8 @@
     public void setDone(boolean done) {
         this.done = done;
     }
+
+    public String getStats() {
+        return adapterExecutor.getStats();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
deleted file mode 100644
index 590af01..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveRuntime;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class IngestionRuntime implements IActiveRuntime {
-
-    private static final Logger LOGGER = Logger.getLogger(IngestionRuntime.class.getName());
-
-    private final AdapterRuntimeManager adapterRuntimeManager;
-    private final ActiveRuntimeId runtimeId;
-    private final EntityId feedId;
-
-    public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, AdapterRuntimeManager adaptorRuntimeManager) {
-        this.feedId = entityId;
-        this.runtimeId = runtimeId;
-        this.adapterRuntimeManager = adaptorRuntimeManager;
-    }
-
-    @Override
-    public ActiveRuntimeId getRuntimeId() {
-        return this.runtimeId;
-    }
-
-    public void start() {
-        adapterRuntimeManager.start();
-        LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " running on partition " + runtimeId);
-    }
-
-    @Override
-    public void stop() throws InterruptedException, HyracksDataException {
-        adapterRuntimeManager.stop();
-        LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId);
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
similarity index 63%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 0e931f7..822d725 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -18,37 +18,29 @@
  */
 package org.apache.asterix.external.feed.watch;
 
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedEventSubscriber implements IActiveEventSubscriber {
+public abstract class AbstractSubscriber implements IActiveEventSubscriber {
 
-    private final FeedEventsListener listener;
-    private final ActivityState state;
+    protected final IActiveEntityEventsListener listener;
     private boolean done = false;
 
-    public FeedEventSubscriber(FeedEventsListener listener, ActivityState state) {
+    public AbstractSubscriber(IActiveEntityEventsListener listener) {
         this.listener = listener;
-        this.state = state;
-
     }
 
     @Override
-    public synchronized void notify(ActiveEvent event) {
-        if (listener.getState() == state || listener.getState() == ActivityState.FAILED
-                || listener.getState() == ActivityState.STOPPED) {
-            done = true;
-            notifyAll();
-        }
-    }
-
-    @Override
-    public synchronized boolean done() {
+    public synchronized boolean isDone() {
         return done;
     }
 
+    public synchronized void complete() throws HyracksDataException {
+        done = true;
+        notifyAll();
+    }
+
     @Override
     public synchronized void sync() throws InterruptedException {
         while (!done) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 9d8c570..42f7a74 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -19,7 +19,9 @@
 package org.apache.asterix.external.feed.watch;
 
 import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An event subscriber that does not listen to any events
@@ -33,22 +35,26 @@
 
     @Override
     public void notify(ActiveEvent event) {
-        // do nothing
+        // no op
     }
 
     @Override
-    public boolean done() {
+    public boolean isDone() {
         return true;
     }
 
     @Override
     public void sync() {
-        // do nothing
+        // no op
     }
 
     @Override
     public void unsubscribe() {
-        // do nothing
+        // no op
     }
 
+    @Override
+    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+        // no op
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
new file mode 100644
index 0000000..fa2fa7f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StatsSubscriber extends AbstractSubscriber {
+
+    public StatsSubscriber(IActiveEntityEventsListener listener) throws HyracksDataException {
+        super(listener);
+        listener.subscribe(this);
+    }
+
+    @Override
+    public void notify(ActiveEvent event) throws HyracksDataException {
+        if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
+            complete();
+        }
+    }
+
+    @Override
+    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+        //Does nothing upon subscription
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
new file mode 100644
index 0000000..ea7e3ae
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class WaitForStateSubscriber extends AbstractSubscriber {
+
+    private final ActivityState targetState;
+
+    public WaitForStateSubscriber(IActiveEntityEventsListener listener, ActivityState targetState)
+            throws HyracksDataException {
+        super(listener);
+        this.targetState = targetState;
+        if (targetState != ActivityState.STARTED && targetState != ActivityState.STOPPED) {
+            throw new RuntimeDataException(ErrorCode.CANNOT_WAIT_FOR_STATE, targetState);
+        }
+        listener.subscribe(this);
+    }
+
+    @Override
+    public void notify(ActiveEvent event) throws HyracksDataException {
+        if (listener.getState() == targetState) {
+            complete();
+        }
+    }
+
+    @Override
+    public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
+        if (eventsListener.getState() == ActivityState.FAILED) {
+            throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
+        }
+        if (listener.getState() == targetState) {
+            complete();
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 2876ea6..8c6a420 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -105,4 +105,13 @@
             adapterRuntimeManager.stop();
         }
     }
+
+    @Override
+    public String getStats() {
+        if (adapterRuntimeManager != null) {
+            return adapterRuntimeManager.getStats();
+        } else {
+            return "\"Runtime stats is not available.\"";
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 3c41165..4a4c25a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -91,8 +91,7 @@
         }
     }
 
-    protected void sendError(IServletResponse response, HttpResponseStatus status, String message)
-            throws IOException {
+    protected void sendError(IServletResponse response, HttpResponseStatus status, String message) throws IOException {
         response.setStatus(status);
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
         if (message != null) {
@@ -154,7 +153,7 @@
 
     public String localPath(IServletRequest request) {
         final String uri = request.getHttpRequest().uri();
-        int queryStart = uri.indexOf("?");
+        int queryStart = uri.indexOf('?');
         return queryStart == -1 ? uri.substring(trim(uri)) : uri.substring(trim(uri), queryStart);
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1748
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I46b48b52a1c9906510c5bdce778d1672845f75ca
Gerrit-PatchSet: 12
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkkwww@gmail.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sjaco002@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xkkwww@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message