asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/7] asterixdb git commit: Refactor General Active Classes
Date Fri, 22 Jul 2016 13:34:04 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 6b1c5b8..294642e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.feed.runtime;
 
 import java.util.Map;
 
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
@@ -32,16 +34,17 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
  * intake job. For a secondary feed, tuples are collected from the intake/compute
  * runtime associated with the source feed.
  */
-public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+public class CollectionRuntime extends ActiveRuntime implements ISubscriberRuntime {
 
-    private final FeedConnectionId connectionId;            // [Dataverse - Feed - Dataset]
-    private final ISubscribableRuntime sourceRuntime;       // Runtime that provides the data
-    private final Map<String, String> feedPolicy;           // Policy associated with the feed
-    private final FeedFrameCollector frameCollector;        // Collector that can be plugged into a frame distributor
+    private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
+    private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data
+    private final Map<String, String> feedPolicy; // Policy associated with the feed
+    private final FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor
     private final IHyracksTaskContext ctx;
 
-    public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy, IHyracksTaskContext ctx, FeedFrameCollector frameCollector) {
+    public CollectionRuntime(FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
+            ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy, IHyracksTaskContext ctx,
+            FeedFrameCollector frameCollector) {
         super(runtimeId);
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
deleted file mode 100644
index 8c86d86..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime;
-
-public class FeedRuntime implements IFeedRuntime {
-
-    /** A unique identifier for the runtime **/
-    protected final FeedRuntimeId runtimeId;
-
-    public FeedRuntime(FeedRuntimeId runtimeId) {
-        this.runtimeId = runtimeId;;
-    }
-
-    @Override
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    @Override
-    public String toString() {
-        return runtimeId.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
deleted file mode 100644
index 18d4cff..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.io.Serializable;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedId;
-
-public class FeedRuntimeId implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final String DEFAULT_TARGET_ID = "N/A";
-
-    private final FeedId feedId;
-    private final FeedRuntimeType runtimeType;
-    private final int partition;
-    private final String targetId;
-    private final int hashCode;
-
-    public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
-        this.feedId = feedId;
-        this.runtimeType = runtimeType;
-        this.partition = partition;
-        this.targetId = targetId;
-        this.hashCode = toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return runtimeType + "(" + feedId + ")" + "[" + partition + "]" + "==>" + "{" + targetId + "}";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof FeedRuntimeId)) {
-            return false;
-        }
-        FeedRuntimeId other = (FeedRuntimeId) o;
-        return (other.feedId.equals(feedId) && other.getFeedRuntimeType().equals(runtimeType)
-                && other.getTargetId().equals(targetId) && other.getPartition() == partition);
-    }
-
-    @Override
-    public int hashCode() {
-        return hashCode;
-    }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return runtimeType;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public FeedRuntimeType getRuntimeType() {
-        return runtimeType;
-    }
-
-    public String getTargetId() {
-        return targetId;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 6cdc2af..8ee3e2b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -20,10 +20,11 @@ package org.apache.asterix.external.feed.runtime;
 
 import java.util.logging.Level;
 
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,7 +35,7 @@ public class IngestionRuntime extends SubscribableRuntime {
     private final IHyracksTaskContext ctx;
     private int numSubscribers = 0;
 
-    public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+    public IngestionRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
             AdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
         super(feedId, runtimeId, feedWriter);
         this.adapterRuntimeManager = adaptorRuntimeManager;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
index e060e27..423e599 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
@@ -22,26 +22,28 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
 
-public abstract class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
+public abstract class SubscribableRuntime extends ActiveRuntime implements ISubscribableRuntime {
 
     protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
-    protected final FeedId feedId;
+    protected final EntityId feedId;
     protected final List<ISubscriberRuntime> subscribers;
     protected final DistributeFeedFrameWriter dWriter;
 
-    public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
+    public SubscribableRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
         super(runtimeId);
         this.feedId = feedId;
         this.dWriter = dWriter;
         this.subscribers = new ArrayList<ISubscriberRuntime>();
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return feedId;
     }
 
@@ -49,8 +51,4 @@ public abstract class SubscribableRuntime extends FeedRuntime implements ISubscr
     public String toString() {
         return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
     }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return runtimeId.getFeedRuntimeType();
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index b69a7b3..82cdddf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -21,13 +21,18 @@ package org.apache.asterix.external.feed.watch;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedUtils.JobType;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
-public class FeedConnectJobInfo extends FeedJobInfo {
+public class FeedConnectJobInfo extends ActiveJob {
 
+    private static final long serialVersionUID = 1L;
     private final FeedConnectionId connectionId;
     private final Map<String, String> feedPolicy;
     private final IFeedJoint sourceFeedJoint;
@@ -38,10 +43,10 @@ public class FeedConnectJobInfo extends FeedJobInfo {
     private List<String> storageLocations;
     private int partitionStarts = 0;
 
-    public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+    public FeedConnectJobInfo(EntityId entityId, JobId jobId, ActivityState state, FeedConnectionId connectionId,
             IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
             Map<String, String> feedPolicy) {
-        super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+        super(entityId, jobId, state, JobType.FEED_CONNECT, spec);
         this.connectionId = connectionId;
         this.sourceFeedJoint = sourceFeedJoint;
         this.computeFeedJoint = computeFeedJoint;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
index 3b11811..4114e82 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
@@ -20,27 +20,31 @@ package org.apache.asterix.external.feed.watch;
 
 import java.util.List;
 
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedUtils.JobType;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
-public class FeedIntakeInfo extends FeedJobInfo {
+public class FeedIntakeInfo extends ActiveJob {
 
-    private final FeedId feedId;
+    private static final long serialVersionUID = 1L;
+    private final EntityId feedId;
     private final IFeedJoint intakeFeedJoint;
     private final JobSpecification spec;
     private List<String> intakeLocation;
 
-    public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+    public FeedIntakeInfo(JobId jobId, ActivityState state, EntityId feedId, IFeedJoint intakeFeedJoint,
             JobSpecification spec) {
-        super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+        super(feedId, jobId, state, JobType.INTAKE, spec);
         this.feedId = feedId;
         this.intakeFeedJoint = intakeFeedJoint;
         this.spec = spec;
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return feedId;
     }
 
@@ -48,6 +52,7 @@ public class FeedIntakeInfo extends FeedJobInfo {
         return intakeFeedJoint;
     }
 
+    @Override
     public JobSpecification getSpec() {
         return spec;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
deleted file mode 100644
index 92e00cb..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedJobInfo {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
-
-    public enum JobType {
-        INTAKE,
-        FEED_CONNECT
-    }
-
-    public enum FeedJobState {
-        CREATED,
-        ACTIVE,
-        UNDER_RECOVERY,
-        ENDED
-    }
-
-    protected final JobId jobId;
-    protected final JobType jobType;
-    protected FeedJobState state;
-    protected JobSpecification spec;
-
-    public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
-        this.jobId = jobId;
-        this.state = state;
-        this.jobType = jobType;
-        this.spec = spec;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public FeedJobState getState() {
-        return state;
-    }
-
-    public void setState(FeedJobState state) {
-        this.state = state;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(this + " is in " + state + " state.");
-        }
-    }
-
-    public JobType getJobType() {
-        return jobType;
-    }
-
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public void setSpec(JobSpecification spec) {
-        this.spec = spec;
-    }
-
-    public String toString() {
-        return jobId + " [" + jobType + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 36098ee..84c2cb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -20,13 +20,13 @@ package org.apache.asterix.external.operators;
 
 import java.util.Map;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -56,13 +56,13 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     private final Map<String, String> feedPolicyProperties;
 
     /** The source feed from which the feed derives its data from. **/
-    private final FeedId sourceFeedId;
+    private final EntityId sourceFeedId;
 
     /** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/
     private final FeedRuntimeType subscriptionLocation;
 
-    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
-            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+            EntityId sourceFeedId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
             FeedRuntimeType subscriptionLocation) {
         super(spec, 0, 1);
         this.recordDescriptors[0] = rDesc;
@@ -77,13 +77,11 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+        ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject()).getFeedManager();
-        FeedRuntimeId sourceRuntimeId =
-                new FeedRuntimeId(sourceFeedId, subscriptionLocation, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
-        ISubscribableRuntime sourceRuntime = feedManager.getSubscribableRuntime(sourceRuntimeId);
-        return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
-                nPartitions, sourceRuntime);
+        ActiveRuntimeId sourceRuntimeId = new ActiveRuntimeId(sourceFeedId, subscriptionLocation.toString(), partition);
+        ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getSubscribableRuntime(sourceRuntimeId);
+        return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition, sourceRuntime);
     }
 
     public FeedConnectionId getFeedConnectionId() {
@@ -102,7 +100,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
         return recordDescriptors[0];
     }
 
-    public FeedId getSourceFeedId() {
+    public EntityId getSourceFeedId() {
         return sourceFeedId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index aeea6ba..231fe99 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -20,19 +20,18 @@ package org.apache.asterix.external.operators;
 
 import java.util.Map;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -47,46 +46,46 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
     private final FeedConnectionId connectionId;
     private final Map<String, String> feedPolicy;
     private final FeedPolicyAccessor policyAccessor;
-    private final FeedManager feedManager;
+    private final ActiveManager feedManager;
     private final ISubscribableRuntime sourceRuntime;
     private final IHyracksTaskContext ctx;
     private CollectionRuntime collectRuntime;
 
-    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
-            FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
-            ISubscribableRuntime sourceRuntime) {
+    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicy, int partition, ISubscribableRuntime sourceRuntime) {
         this.ctx = ctx;
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
         this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
     }
 
     @Override
     public void initialize() throws HyracksDataException {
         try {
-            FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT, partition,
-                    FeedRuntimeId.DEFAULT_TARGET_ID);
+            ActiveRuntimeId runtimeId =
+                    new ActiveRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT.toString(), partition);
             // Does this collector have a handler?
             FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
             if (policyAccessor.bufferingEnabled()) {
                 writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
-                        feedManager.getFeedMemoryManager());
+                        feedManager.getFramePool());
             } else {
                 writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
             }
             collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
                     new FeedFrameCollector(policyAccessor, writer, connectionId));
-            feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+            feedManager.getActiveRuntimeRegistry().registerRuntime(collectRuntime);
             sourceRuntime.subscribe(collectRuntime);
             // Notify CC that Collection started
             ctx.sendApplicationMessageToCC(
-                    new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()), null);
+                    new ActivePartitionMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId(), null),
+                    null);
             collectRuntime.waitTillCollectionOver();
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+            feedManager.getActiveRuntimeRegistry().deregisterRuntime(collectRuntime.getRuntimeId());
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 69aa59a..f4ea60f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -21,11 +21,12 @@ package org.apache.asterix.external.operators;
 import java.util.Map;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -47,7 +48,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
 
     /** The unique identifier of the feed that is being ingested. **/
-    private final FeedId feedId;
+    private final EntityId feedId;
 
     private final FeedPolicyAccessor policyAccessor;
 
@@ -71,7 +72,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
             ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
         super(spec, 0, 1);
-        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
+                primaryFeed.getFeedName());
         this.adaptorFactory = adapterFactory;
         this.adapterOutputType = adapterOutputType;
         this.policyAccessor = policyAccessor;
@@ -82,7 +84,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
             String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor,
             RecordDescriptor rDesc) {
         super(spec, 0, 1);
-        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
+                primaryFeed.getFeedName());
         this.adaptorFactoryClassName = adapterFactoryClassName;
         this.adaptorLibraryName = adapterLibraryName;
         this.adaptorConfiguration = primaryFeed.getAdapterConfiguration();
@@ -124,7 +127,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
         return adapterFactory;
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return feedId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 04ef016..ffa451b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,22 +18,21 @@
  */
 package org.apache.asterix.external.operators;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
@@ -43,13 +42,13 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
  */
 public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
-    private final FeedId feedId;
+    private final EntityId feedId;
     private final int partition;
     private final IHyracksTaskContext ctx;
     private final IAdapterFactory adapterFactory;
     private final FeedIntakeOperatorDescriptor opDesc;
 
-    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
+    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
             int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
             FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
         this.opDesc = feedIntakeOperatorDescriptor;
@@ -62,7 +61,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
 
     @Override
     public void initialize() throws HyracksDataException {
-        FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+        ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject()).getFeedManager();
         AdapterRuntimeManager adapterRuntimeManager = null;
         DistributeFeedFrameWriter frameDistributor = null;
@@ -73,17 +72,15 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
             // create the adapter
             FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
             // create the distributor
-            frameDistributor = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
-                    new FrameTupleAccessor(recordDesc));
+            frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
             // create adapter runtime manager
             adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
             // create and register the runtime
-            FeedRuntimeId runtimeId =
-                    new FeedRuntimeId(feedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+            ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
             ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);
-            feedManager.registerFeedSubscribableRuntime(ingestionRuntime);
+            feedManager.registerRuntime(ingestionRuntime);
             // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-            ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
+            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(feedId, ctx.getJobletContext().getJobId(), null),
                     null);
             // open the distributor
             open = true;
@@ -95,7 +92,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
                 }
             }
             // The ingestion is over. we need to remove the runtime from the manager
-            feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
+            feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
             // If there was a failure, we need to throw an exception
             if (adapterRuntimeManager.isFailed()) {
                 throw new HyracksDataException("Unable to ingest data");
@@ -108,7 +105,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
              */
             if (ingestionRuntime != null) {
                 ingestionRuntime.terminate();
-                feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
+                feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
             }
             throw new HyracksDataException(ie);
         } finally {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
index 219110f..61451b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.operators;
 
-import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.active.IActiveMessage;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -28,17 +28,20 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
 /**
- * Sends a control message to the registered message queue for feed specified by its feedId.
+ * @deprecated
+ *             Sends a control message to the registered message queue for feed specified by its feedId.
+ *             For messaging, use IMessageBroker interfaces
  */
+@Deprecated
 public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
 
     private final FeedConnectionId connectionId;
-    private final IFeedMessage feedMessage;
+    private final IActiveMessage feedMessage;
 
     public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
-            IFeedMessage feedMessage) {
+            IActiveMessage feedMessage) {
         super(spec, 0, 1);
         this.connectionId = connectionId;
         this.feedMessage = feedMessage;
@@ -47,7 +50,7 @@ public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperato
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
+        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 82bf1da..5f92327 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -21,46 +21,49 @@ package org.apache.asterix.external.operators;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.message.EndFeedMessage;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
- * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
- * a feed message to the local feed manager on the host node controller.
+ * @deprecated
+ *             Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ *             a feed message to the local feed manager on the host node controller.
+ *             For messages, use IMessageBroker interfaces
  * @see FeedMessageOperatorDescriptor
  *      IFeedMessage
  *      IFeedManager
  */
+@Deprecated
 public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
 
     private final FeedConnectionId connectionId;
-    private final IFeedMessage message;
-    private final FeedManager feedManager;
+    private final IActiveMessage message;
+    private final ActiveManager feedManager;
     private final int partition;
 
     public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
-            IFeedMessage feedMessage, int partition, int nPartitions) {
+            IActiveMessage feedMessage, int partition) {
         this.connectionId = connectionId;
         this.message = feedMessage;
         this.partition = partition;
         IAsterixAppRuntimeContext runtimeCtx =
                 (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
-        this.feedManager = (FeedManager) runtimeCtx.getFeedManager();
+        this.feedManager = (ActiveManager) runtimeCtx.getFeedManager();
     }
 
     @Override
@@ -77,6 +80,8 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                         case DISCONTINUE_SOURCE:
                             handleDiscontinueFeedTypeMessage(endFeedMessage);
                             break;
+                        default:
+                            break;
                     }
                     break;
                 default:
@@ -90,10 +95,11 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
     }
 
     private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
-        FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
-        FeedRuntimeId subscribableRuntimeId =
-                new FeedRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
-        ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(subscribableRuntimeId);
+        EntityId sourceFeedId = endFeedMessage.getSourceFeedId();
+        ActiveRuntimeId subscribableRuntimeId =
+                new ActiveRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE.toString(), partition);
+        ISubscribableRuntime feedRuntime =
+                (ISubscribableRuntime) feedManager.getSubscribableRuntime(subscribableRuntimeId);
         AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
         adapterRuntimeManager.stop();
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -105,12 +111,12 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
         }
-        FeedRuntimeId runtimeId = null;
+        ActiveRuntimeId runtimeId;
         FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
         if (endFeedMessage.isCompleteDisconnection()) {
             // subscribableRuntimeType represents the location at which the feed connection receives
             // data
-            FeedRuntimeType runtimeType = null;
+            FeedRuntimeType runtimeType;
             switch (subscribableRuntimeType) {
                 case INTAKE:
                     runtimeType = FeedRuntimeType.COLLECT;
@@ -122,10 +128,9 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
             }
 
-            runtimeId = new FeedRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType, partition,
-                    FeedRuntimeId.DEFAULT_TARGET_ID);
+            runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType.toString(), partition);
             CollectionRuntime feedRuntime =
-                    (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+                    (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
             if (feedRuntime != null) {
                 feedRuntime.getSourceRuntime().unsubscribe(feedRuntime);
             }
@@ -142,11 +147,14 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
                 case COMPUTE:
                     // feed could be primary or secondary, doesn't matter
-                    FeedRuntimeId feedSubscribableRuntimeId = new FeedRuntimeId(connectionId.getFeedId(),
-                            FeedRuntimeType.COMPUTE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
-                    ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
-                    CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
-                            .getFeedRuntime(connectionId, runtimeId);
+                    ActiveRuntimeId feedSubscribableRuntimeId = new ActiveRuntimeId(connectionId.getFeedId(),
+                            FeedRuntimeType.COMPUTE.toString(), partition);
+                    ISubscribableRuntime feedRuntime =
+                            (ISubscribableRuntime) feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+                    runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(),
+                            FeedRuntimeType.COMPUTE_COLLECT.toString(), partition);
+                    CollectionRuntime feedCollectionRuntime =
+                            (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
                     feedRuntime.unsubscribe(feedCollectionRuntime);
                     break;
                 default:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 8d8bc28..54e17ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -23,17 +23,17 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -63,7 +63,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
      * The Feed Runtime instance associated with the operator. Feed Runtime
      * captures the state of the operator while the feed is active.
      */
-    private FeedRuntime feedRuntime;
+    private ActiveRuntime feedRuntime;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -78,7 +78,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     private int partition;
 
     /** The (singleton) instance of IFeedManager **/
-    private FeedManager feedManager;
+    private ActiveManager feedManager;
 
     private FrameTupleAccessor fta;
 
@@ -109,7 +109,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
-        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.message = new VSizeFrame(ctx);
         ctx.setSharedObject(message);
@@ -119,8 +119,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
 
     @Override
     public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId =
-                new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString(), partition);
         try {
             initializeNewFeedRuntime(runtimeId);
             opened = true;
@@ -131,18 +130,18 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         }
     }
 
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+    private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exception {
         fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
         FeedPolicyAccessor fpa = policyEnforcer.getFeedPolicyAccessor();
         coreOperator.setOutputFrameWriter(0, writer, recordDesc);
         if (fpa.bufferingEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator, fpa, fta,
-                    feedManager.getFeedMemoryManager());
+                    feedManager.getFramePool());
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, coreOperator, fta);
         }
-        feedRuntime = new FeedRuntime(runtimeId);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+        feedRuntime = new ActiveRuntime(runtimeId);
+        feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
     }
 
     @Override
@@ -173,7 +172,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     }
 
     private void deregister() {
-        feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+        feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index c7dd3d2..908601d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -20,8 +20,8 @@ package org.apache.asterix.external.operators;
 
 import java.util.Map;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 47df39e..6f679f7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -23,17 +23,17 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -61,7 +61,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
      * The Feed Runtime instance associated with the operator. Feed Runtime
      * captures the state of the operator while the feed is active.
      */
-    private FeedRuntime feedRuntime;
+    private ActiveRuntime feedRuntime;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -79,7 +79,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
 
     /** The (singleton) instance of IFeedManager **/
-    private final FeedManager feedManager;
+    private final ActiveManager feedManager;
 
     private FrameTupleAccessor fta;
 
@@ -103,7 +103,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
-        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
         this.message = new VSizeFrame(ctx);
@@ -114,7 +114,8 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     @Override
     public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
+        ActiveRuntimeId runtimeId =
+                new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
         try {
             initializeNewFeedRuntime(runtimeId);
             insertOperator.open();
@@ -124,7 +125,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         }
     }
 
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+    private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exception {
         fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
         insertOperator.setOutputFrameWriter(0, writer, recordDesc);
         if (insertOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
@@ -138,7 +139,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         }
         if (policyEnforcer.getFeedPolicyAccessor().bufferingEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator,
-                    policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFeedMemoryManager());
+                    policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFramePool());
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
         }
@@ -146,9 +147,10 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     }
 
     private void setupBasicRuntime(IFrameWriter frameWriter) throws Exception {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
-        feedRuntime = new FeedRuntime(runtimeId);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+        ActiveRuntimeId runtimeId =
+                new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
+        feedRuntime = new ActiveRuntime(runtimeId);
+        feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
     }
 
     @Override
@@ -177,7 +179,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     }
 
     private void deregister() {
-        feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+        feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 8228c39..6b7eb31 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -42,6 +42,29 @@ import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FeedUtils {
+
+    public enum JobType {
+        INTAKE,
+        FEED_CONNECT
+    }
+
+    public enum FeedRuntimeType {
+        INTAKE,
+        COLLECT,
+        COMPUTE_COLLECT,
+        COMPUTE,
+        STORE,
+        OTHER,
+        ETS,
+        JOIN
+    }
+
+    public enum Mode {
+        PROCESS,            // There is memory
+        SPILL,              // Memory budget has been consumed. Now we're writing to disk
+        DISCARD             // Memory and Disk space budgets have been consumed. Now we're discarding
+    }
+
     private static String prepareDataverseFeedName(String dataverseName, String feedName) {
         return dataverseName + File.separator + feedName;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
index 49042b8..0f6a2ea 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -24,9 +24,9 @@ import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.FrameAction;
 import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.dataflow.FrameAction;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.junit.Assert;
 import org.mockito.Mockito;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index bc1c328..e643206 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -72,10 +72,9 @@ public class InputHandlerTest extends TestCase {
     private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
             FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
         FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
-        FeedId feedId = new FeedId(DATAVERSE, FEED);
+        EntityId feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, DATAVERSE, FEED);
         FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
-        FeedRuntimeId runtimeId =
-                new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID);
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
         return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 092bf69..14f229a 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -24,11 +24,11 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivity;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -52,7 +52,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
  * Represents the AQL statement for subscribing to a feed.
  * This AQL statement is private and may not be used by the end-user.
  */
-public class SubscribeFeedStatement implements Statement {
+public class SubscribeFeedStatement extends Statement {
 
     private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
     private final FeedConnectionRequest connectionRequest;
@@ -71,10 +71,10 @@ public class SubscribeFeedStatement implements Statement {
 
     public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         this.query = new Query();
-        FeedId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
-        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
-                connectionRequest.getReceivingFeedId().getDataverse(),
-                connectionRequest.getReceivingFeedId().getFeedName());
+        EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+        Feed subscriberFeed =
+                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
+                        connectionRequest.getReceivingFeedId().getEntityName());
         if (subscriberFeed == null) {
             throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
         }
@@ -100,8 +100,9 @@ public class SubscribeFeedStatement implements Statement {
 
         builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
         builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
-                + sourceFeedId.getFeedName() + "'" + "," + "'" + connectionRequest.getReceivingFeedId().getFeedName()
-                + "'" + "," + "'" + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+                + sourceFeedId.getEntityName() + "'" + "," + "'"
+                + connectionRequest.getReceivingFeedId().getEntityName() + "'" + "," + "'"
+                + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
                 + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
 
         List<String> functionsToApply = connectionRequest.getFunctionsToApply();
@@ -156,8 +157,8 @@ public class SubscribeFeedStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.SUBSCRIBE_FEED;
+    public byte getKind() {
+        return Statement.SUBSCRIBE_FEED;
     }
 
     public String getPolicy() {
@@ -179,8 +180,8 @@ public class SubscribeFeedStatement implements Statement {
 
     private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         String outputType = null;
-        FeedId feedId = connectionRequest.getReceivingFeedId();
-        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName());
+        EntityId feedId = connectionRequest.getReceivingFeedId();
+        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
         FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
         try {
             switch (feed.getFeedType()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
index 2fc4b14..f30d4a6 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
@@ -70,7 +70,7 @@ public abstract class RangeMapBuilder {
         }
 
         // Translate the query into a Range Map
-        if (hintStatements.get(0).getKind() != Statement.Kind.QUERY) {
+        if (hintStatements.get(0).getKind() != Statement.QUERY) {
             throw new AsterixException("Not a proper query for the range hint.");
         }
         Query q = (Query) hintStatements.get(0);
@@ -151,8 +151,8 @@ public abstract class RangeMapBuilder {
         int fieldIndex = 0;
         int fieldType = rangeMap.getTag(0, 0);
         AqlBinaryComparatorFactoryProvider comparatorFactory = AqlBinaryComparatorFactoryProvider.INSTANCE;
-        IBinaryComparatorFactory bcf = comparatorFactory
-                .getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending);
+        IBinaryComparatorFactory bcf =
+                comparatorFactory.getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending);
         IBinaryComparator comparator = bcf.createBinaryComparator();
         int c = 0;
         for (int split = 1; split < rangeMap.getSplitCount(); ++split) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 3184d1e..6f0f1f1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -18,45 +18,52 @@
  */
 package org.apache.asterix.lang.common.base;
 
-public interface Statement extends ILangExpression {
-    public enum Kind {
-        DATASET_DECL,
-        DATAVERSE_DECL,
-        DATAVERSE_DROP,
-        DATASET_DROP,
-        DELETE,
-        INSERT,
-        UPSERT,
-        UPDATE,
-        DML_CMD_LIST,
-        FUNCTION_DECL,
-        LOAD,
-        NODEGROUP_DECL,
-        NODEGROUP_DROP,
-        QUERY,
-        SET,
-        TYPE_DECL,
-        TYPE_DROP,
-        WRITE,
-        CREATE_INDEX,
-        INDEX_DECL,
-        CREATE_DATAVERSE,
-        INDEX_DROP,
-        CREATE_PRIMARY_FEED,
-        CREATE_SECONDARY_FEED,
-        DROP_FEED,
-        CONNECT_FEED,
-        DISCONNECT_FEED,
-        SUBSCRIBE_FEED,
-        CREATE_FEED_POLICY,
-        DROP_FEED_POLICY,
-        CREATE_FUNCTION,
-        FUNCTION_DROP,
-        COMPACT,
-        EXTERNAL_DATASET_REFRESH,
-        RUN
-    }
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
-    public abstract Kind getKind();
+public abstract class Statement implements ILangExpression {
+    public static final byte DATASET_DECL = 0x00;
+    public static final byte DATAVERSE_DECL = 0x01;
+    public static final byte DATAVERSE_DROP = 0x02;
+    public static final byte DATASET_DROP = 0x03;
+    public static final byte DELETE = 0x04;
+    public static final byte INSERT = 0x05;
+    public static final byte UPSERT = 0x06;
+    public static final byte UPDATE = 0x07;
+    public static final byte DML_CMD_LIST = 0x08;
+    public static final byte FUNCTION_DECL = 0x09;
+    public static final byte LOAD = 0x0a;
+    public static final byte NODEGROUP_DECL = 0x0b;
+    public static final byte NODEGROUP_DROP = 0x0c;
+    public static final byte QUERY = 0x0d;
+    public static final byte SET = 0x0e;
+    public static final byte TYPE_DECL = 0x0f;
+    public static final byte TYPE_DROP = 0x10;
+    public static final byte WRITE = 0x11;
+    public static final byte CREATE_INDEX = 0x12;
+    public static final byte INDEX_DECL = 0x13;
+    public static final byte CREATE_DATAVERSE = 0x14;
+    public static final byte INDEX_DROP = 0x15;
+    public static final byte CREATE_PRIMARY_FEED = 0x16;
+    public static final byte CREATE_SECONDARY_FEED = 0x17;
+    public static final byte DROP_FEED = 0x18;
+    public static final byte CONNECT_FEED = 0x19;
+    public static final byte DISCONNECT_FEED = 0x1a;
+    public static final byte SUBSCRIBE_FEED = 0x1b;
+    public static final byte CREATE_FEED_POLICY = 0x1c;
+    public static final byte DROP_FEED_POLICY = 0x1d;
+    public static final byte CREATE_FUNCTION = 0x1e;
+    public static final byte FUNCTION_DROP = 0x1f;
+    public static final byte COMPACT = 0x20;
+    public static final byte EXTERNAL_DATASET_REFRESH = 0x21;
+    public static final byte RUN = 0x22;
+    public static final List<Byte> VALUES = Collections.unmodifiableList(
+            Arrays.asList(DATASET_DECL, DATAVERSE_DECL, DATAVERSE_DROP, DATASET_DROP, DELETE, INSERT, UPSERT, UPDATE,
+                    DML_CMD_LIST, FUNCTION_DECL, LOAD, NODEGROUP_DECL, NODEGROUP_DROP, QUERY, SET, TYPE_DECL, TYPE_DROP,
+                    WRITE, CREATE_INDEX, INDEX_DECL, CREATE_DATAVERSE, INDEX_DROP, CREATE_PRIMARY_FEED,
+                    CREATE_SECONDARY_FEED, DROP_FEED, CONNECT_FEED, DISCONNECT_FEED, SUBSCRIBE_FEED, CREATE_FEED_POLICY,
+                    DROP_FEED_POLICY, CREATE_FUNCTION, FUNCTION_DROP, COMPACT, EXTERNAL_DATASET_REFRESH, RUN));
 
+    public abstract byte getKind();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
index 93d151c..531957f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
@@ -23,7 +23,7 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 
-public class CompactStatement implements Statement {
+public class CompactStatement extends Statement {
 
     private final Identifier dataverseName;
     private final Identifier datasetName;
@@ -34,8 +34,8 @@ public class CompactStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.COMPACT;
+    public byte getKind() {
+        return Statement.COMPACT;
     }
 
     public Identifier getDataverseName() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index b4208b9..33e3340 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -25,7 +25,7 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
-public class ConnectFeedStatement implements Statement {
+public class ConnectFeedStatement extends Statement {
 
     private final Identifier dataverseName;
     private final Identifier datasetName;
@@ -77,8 +77,8 @@ public class ConnectFeedStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.CONNECT_FEED;
+    public byte getKind() {
+        return Statement.CONNECT_FEED;
     }
 
     public String getPolicy() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 1eb8372..820ae5f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -23,7 +23,7 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 
-public class CreateDataverseStatement implements Statement {
+public class CreateDataverseStatement extends Statement {
 
     private Identifier dataverseName;
     private String format;
@@ -31,10 +31,11 @@ public class CreateDataverseStatement implements Statement {
 
     public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
         this.dataverseName = dataverseName;
-        if (format == null)
+        if (format == null) {
             this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
-        else
+        } else {
             this.format = format;
+        }
         this.ifNotExists = ifNotExists;
     }
 
@@ -51,8 +52,8 @@ public class CreateDataverseStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.CREATE_DATAVERSE;
+    public byte getKind() {
+        return Statement.CREATE_DATAVERSE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
index bd3192c..e972cad 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
@@ -24,7 +24,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 
-public class CreateFeedPolicyStatement implements Statement {
+public class CreateFeedPolicyStatement extends Statement {
 
     private final String policyName;
     private final String sourcePolicyName;
@@ -58,8 +58,8 @@ public class CreateFeedPolicyStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Statement.Kind.CREATE_FEED_POLICY;
+    public byte getKind() {
+        return Statement.CREATE_FEED_POLICY;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index 53d05d2..9635836 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -25,7 +25,7 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
-public abstract class CreateFeedStatement implements Statement {
+public abstract class CreateFeedStatement extends Statement {
 
     private final Pair<Identifier, Identifier> qName;
     private final FunctionSignature appliedFunction;
@@ -55,9 +55,6 @@ public abstract class CreateFeedStatement implements Statement {
     }
 
     @Override
-    public abstract Kind getKind();
-
-    @Override
     public abstract <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException;
 
 }


Mime
View raw message