asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [4/7] asterixdb git commit: Refactor General Active Classes
Date Fri, 22 Jul 2016 13:34:06 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 295b308..4df4468 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -37,11 +37,14 @@ import java.util.Random;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.app.external.ExternalIndexingOperations;
 import org.apache.asterix.app.external.FeedJoint;
-import org.apache.asterix.app.external.FeedLifecycleListener;
 import org.apache.asterix.app.external.FeedOperations;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -61,15 +64,17 @@ import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedEventsListener;
 import org.apache.asterix.external.feed.management.FeedJointKey;
 import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.DatasetOperations;
 import org.apache.asterix.file.DataverseOperations;
 import org.apache.asterix.file.IndexOperations;
@@ -134,6 +139,7 @@ import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
@@ -195,7 +201,7 @@ import com.google.common.collect.Lists;
  */
 public class QueryTranslator extends AbstractLangTranslator {
 
-    private static Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName());
 
     private enum ProgressState {
         NO_PROGRESS,
@@ -226,9 +232,9 @@ public class QueryTranslator extends AbstractLangTranslator {
     }
 
     private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
-        List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
+        List<FunctionDecl> functionDecls = new ArrayList<>();
         for (Statement st : statements) {
-            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
+            if (st.getKind() == Statement.FUNCTION_DECL) {
                 functionDecls.add((FunctionDecl) st);
             }
         }
@@ -258,175 +264,138 @@ public class QueryTranslator extends AbstractLangTranslator {
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
         IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
-        Map<String, String> config = new HashMap<String, String>();
-
-        for (Statement stmt : statements) {
-            if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-                sessionConfig.out().println(APIFramework.HTML_STATEMENT_SEPARATOR);
-            }
-            validateOperation(activeDefaultDataverse, stmt);
-            rewriteStatement(stmt); // Rewrite the statement's AST.
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
-            metadataProvider.setWriterFactory(writerFactory);
-            metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
-            metadataProvider.setOutputFile(outputFile);
-            metadataProvider.setConfig(config);
-            switch (stmt.getKind()) {
-                case SET: {
-                    handleSetStatement(metadataProvider, stmt, config);
-                    break;
-                }
-                case DATAVERSE_DECL: {
-                    activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case CREATE_DATAVERSE: {
-                    handleCreateDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATASET_DECL: {
-                    handleCreateDatasetStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case CREATE_INDEX: {
-                    handleCreateIndexStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DECL: {
-                    handleCreateTypeStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DECL: {
-                    handleCreateNodeGroupStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATAVERSE_DROP: {
-                    handleDataverseDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DATASET_DROP: {
-                    handleDatasetDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INDEX_DROP: {
-                    handleIndexDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DROP: {
-                    handleTypeDropStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DROP: {
-                    handleNodegroupDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case CREATE_FUNCTION: {
-                    handleCreateFunctionStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case FUNCTION_DROP: {
-                    handleFunctionDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case LOAD: {
-                    handleLoadStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INSERT:
-                case UPSERT: {
-                    handleInsertUpsertStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DELETE: {
-                    handleDeleteStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_PRIMARY_FEED:
-                case CREATE_SECONDARY_FEED: {
-                    handleCreateFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED: {
-                    handleDropFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED_POLICY: {
-                    handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CONNECT_FEED: {
-                    handleConnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DISCONNECT_FEED: {
-                    handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case SUBSCRIBE_FEED: {
-                    handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_FEED_POLICY: {
-                    handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case QUERY: {
-                    metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                    metadataProvider.setResultAsyncMode(
-                            resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
-                    handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
-                    break;
-                }
-
-                case COMPACT: {
-                    handleCompactStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case EXTERNAL_DATASET_REFRESH: {
-                    handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case WRITE: {
-                    Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt);
-                    if (result.first != null) {
-                        writerFactory = result.first;
-                    }
-                    outputFile = result.second;
-                    break;
-                }
-
-                case RUN: {
-                    handleRunStatement(metadataProvider, stmt, hcc);
-                    break;
+        Map<String, String> config = new HashMap<>();
+        /* Since the system runs a large number of threads, when HTTP requests don't return, it becomes difficult to
+         * find the thread running the request to determine where it has stopped.
+         * Setting the thread name helps make that easier
+         */
+        String threadName = Thread.currentThread().getName();
+        Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+        try {
+            for (Statement stmt : statements) {
+                if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
+                    sessionConfig.out().println(APIFramework.HTML_STATEMENT_SEPARATOR);
+                }
+                validateOperation(activeDefaultDataverse, stmt);
+                rewriteStatement(stmt); // Rewrite the statement's AST.
+                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+                metadataProvider.setWriterFactory(writerFactory);
+                metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
+                metadataProvider.setOutputFile(outputFile);
+                metadataProvider.setConfig(config);
+                switch (stmt.getKind()) {
+                    case Statement.SET:
+                        handleSetStatement(stmt, config);
+                        break;
+                    case Statement.DATAVERSE_DECL:
+                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.CREATE_DATAVERSE:
+                        handleCreateDataverseStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.DATASET_DECL:
+                        handleCreateDatasetStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.CREATE_INDEX:
+                        handleCreateIndexStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.TYPE_DECL:
+                        handleCreateTypeStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.NODEGROUP_DECL:
+                        handleCreateNodeGroupStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.DATAVERSE_DROP:
+                        handleDataverseDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.DATASET_DROP:
+                        handleDatasetDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.INDEX_DROP:
+                        handleIndexDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.TYPE_DROP:
+                        handleTypeDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.NODEGROUP_DROP:
+                        handleNodegroupDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.CREATE_FUNCTION:
+                        handleCreateFunctionStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.FUNCTION_DROP:
+                        handleFunctionDropStatement(metadataProvider, stmt);
+                        break;
+                    case Statement.LOAD:
+                        handleLoadStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.INSERT:
+                    case Statement.UPSERT:
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.DELETE:
+                        handleDeleteStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.CREATE_PRIMARY_FEED:
+                    case Statement.CREATE_SECONDARY_FEED:
+                        handleCreateFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.DROP_FEED:
+                        handleDropFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.DROP_FEED_POLICY:
+                        handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.CONNECT_FEED:
+                        handleConnectFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.DISCONNECT_FEED:
+                        handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.SUBSCRIBE_FEED:
+                        handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.CREATE_FEED_POLICY:
+                        handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.QUERY:
+                        metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                        metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                                || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
+                        break;
+                    case Statement.COMPACT:
+                        handleCompactStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.EXTERNAL_DATASET_REFRESH:
+                        handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.WRITE:
+                        Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(stmt);
+                        writerFactory = (result.first != null) ? result.first : writerFactory;
+                        outputFile = result.second;
+                        break;
+                    case Statement.RUN:
+                        handleRunStatement(metadataProvider, stmt, hcc);
+                        break;
+                    default:
+                        // Default should delegate unknown statement to extension-manager
+                        break;
                 }
-
-                default:
-                    break;
             }
+        } finally {
+            Thread.currentThread().setName(threadName);
         }
     }
 
-    private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config) {
+    private void handleSetStatement(Statement stmt, Map<String, String> config) {
         SetStatement ss = (SetStatement) stmt;
         String pname = ss.getPropName();
         String pvalue = ss.getPropValue();
         config.put(pname, pvalue);
     }
 
-    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(Statement stmt)
             throws InstantiationException, IllegalAccessException, ClassNotFoundException {
         WriteStatement ws = (WriteStatement) stmt;
         File f = new File(ws.getFileName());
@@ -435,7 +404,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         if (ws.getWriterClassName() != null) {
             writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
         }
-        return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
+        return new Pair<>(writerFactory, outputFile);
     }
 
     private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
@@ -497,8 +466,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
         }
         String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class
-                .forName(compactionPolicyFactoryClassName).newInstance();
+        ILSMMergePolicyFactory mergePolicyFactory =
+                (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
         if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
             throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
         }
@@ -566,8 +535,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
             }
-            String ngName = ngNameId != null ? ngNameId.getValue()
-                    : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
+            String ngName =
+                    ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -576,7 +545,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
             }
             switch (dd.getDatasetType()) {
-                case INTERNAL: {
+                case INTERNAL:
                     IAType itemType = dt.getDatatype();
                     if (itemType.getTypeTag() != ATypeTag.RECORD) {
                         throw new AlgebricksException("Dataset type has to be a record type.");
@@ -591,10 +560,10 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                     ARecordType metaRecType = (ARecordType) metaItemType;
 
-                    List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getPartitioningExprs();
-                    List<Integer> keySourceIndicators = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getKeySourceIndicators();
+                    List<List<String>> partitioningExprs =
+                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
+                    List<Integer> keySourceIndicators =
+                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators();
                     boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
                     ARecordType aRecordType = (ARecordType) itemType;
                     List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType,
@@ -617,16 +586,15 @@ public class QueryTranslator extends AbstractLangTranslator {
                             InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
                             keySourceIndicators, partitioningTypes, autogenerated, filterField, temp);
                     break;
-                }
-                case EXTERNAL: {
+                case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
                     Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
                             ExternalDatasetTransactionState.COMMIT);
                     break;
-                }
-
+                default:
+                    throw new AsterixException("Unknown datatype " + dd.getDatasetType());
             }
 
             // #. initialize DatasetIdFactory if it is not initialized.
@@ -642,10 +610,10 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
 
             if (dd.getDatasetType() == DatasetType.INTERNAL) {
-                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                        dataverseName);
-                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
-                        metadataProvider);
+                Dataverse dataverse =
+                        MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
+                JobSpecification jobSpec =
+                        DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider);
 
                 // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -719,24 +687,21 @@ public class QueryTranslator extends AbstractLangTranslator {
     }
 
     private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
-        List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-        boolean resourceInUse = false;
-        StringBuilder builder = new StringBuilder();
-
-        if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
-            for (FeedConnectionId connId : activeFeedConnections) {
-                if (connId.getDatasetName().equals(datasetName)) {
-                    resourceInUse = true;
-                    builder.append(connId + "\n");
+        StringBuilder builder = null;
+        IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+        for (IActiveEntityEventsListener listener : listeners) {
+            if (listener instanceof FeedEventsListener
+                    && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+                if (builder == null) {
+                    builder = new StringBuilder();
                 }
+                builder.append(listener.getEntityId() + "\n");
             }
         }
-
-        if (resourceInUse) {
-            throw new AsterixException("Dataset " + datasetName + " is currently being "
+        if (builder != null) {
+            throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being "
                     + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
         }
-
     }
 
     private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
@@ -747,13 +712,13 @@ public class QueryTranslator extends AbstractLangTranslator {
         if (hintValue == null) {
             return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
         } else {
-            return (dataverse + ":" + dd.getName().getValue());
+            return dataverse + ":" + dd.getName().getValue();
         }
     }
 
     private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
             throws AsterixException {
-        int nodegroupCardinality = -1;
+        int nodegroupCardinality;
         String nodegroupName;
         String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
         if (hintValue == null) {
@@ -849,8 +814,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             int keyIndex = 0;
             for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
                 IAType fieldType = null;
-                ARecordType subType = KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType,
-                        metaRecordType);
+                ARecordType subType =
+                        KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType);
                 boolean isOpen = subType.isOpen();
                 int i = 0;
                 if (fieldExpr.first.size() > 1 && !isOpen) {
@@ -877,8 +842,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                     if (stmtCreateIndex.hasMetaField()) {
                         throw new AlgebricksException("Typed open index can only be created on the record part");
                     }
-                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
-                            indexName, dataverseName);
+                    Map<TypeSignature, IAType> typeMap =
+                            TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second, indexName, dataverseName);
                     TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
                     fieldType = typeMap.get(typeSignature);
                 }
@@ -1079,8 +1044,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                         ExternalIndexingOperations.getFilesIndexName(datasetName));
                 try {
-                    JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                            metadataProvider, ds);
+                    JobSpecification jobSpec =
+                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     JobUtils.runJob(hcc, jobSpec, true);
@@ -1100,8 +1065,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 try {
-                    JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                            ds);
+                    JobSpecification jobSpec =
+                            IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
 
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
@@ -1223,32 +1188,21 @@ public class QueryTranslator extends AbstractLangTranslator {
                     throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
                 }
             }
-
             // # disconnect all feeds from any datasets in the dataverse.
-            List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
-                    .getActiveFeedConnections(null);
-            DisconnectFeedStatement disStmt = null;
+            IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             Identifier dvId = new Identifier(dataverseName);
-            for (FeedConnectionId connection : activeFeedConnections) {
-                FeedId feedId = connection.getFeedId();
-                if (feedId.getDataverse().equals(dataverseName)) {
-                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()),
-                            new Identifier(connection.getDatasetName()));
-                    try {
-                        handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
-                                    + connection.getDatasetName());
-                        }
-                    } catch (Exception exception) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
-                                    + connection.getDatasetName() + ". Encountered exception " + exception);
-                        }
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                EntityId activeEntityId = listener.getEntityId();
+                if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && activeEntityId.getDataverse().equals(dataverseName)) {
+                    FeedEventsListener feedEventListener = (FeedEventsListener) listener;
+                    FeedConnectionId[] connections = feedEventListener.getConnections();
+                    for (FeedConnectionId conn : connections) {
+                        disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc);
                     }
                     // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
-                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
                 }
             }
 
@@ -1258,8 +1212,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 String datasetName = datasets.get(j).getDatasetName();
                 DatasetType dsType = datasets.get(j).getDatasetType();
                 if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
+                    List<Index> indexes =
+                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
                             CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
@@ -1273,8 +1227,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 } else {
                     // External dataset
-                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
+                    List<Index> indexes =
+                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
                             CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
@@ -1358,6 +1312,24 @@ public class QueryTranslator extends AbstractLangTranslator {
         }
     }
 
+    private void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn,
+            AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc) {
+        DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId,
+                new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName()));
+        try {
+            handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset "
+                        + conn.getDatasetName());
+            }
+        } catch (Exception exception) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset "
+                        + conn.getDatasetName() + ". Encountered exception " + exception);
+            }
+        }
+    }
+
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropStatement stmtDelete = (DropStatement) stmt;
@@ -1387,20 +1359,23 @@ public class QueryTranslator extends AbstractLangTranslator {
             Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-                List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-                if (feedConnections != null && !feedConnections.isEmpty()) {
-                    for (FeedConnectionId connection : feedConnections) {
-                        Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
-                                connection);
-                        disconnectJobList.put(connection, p);
+                IActiveEntityEventsListener[] feedConnections =
+                        ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+                for (IActiveEntityEventsListener conn : feedConnections) {
+                    if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
+                            && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) {
+                        FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName);
+                        Pair<JobSpecification, Boolean> p =
+                                FeedOperations.buildDisconnectFeedJobSpec(metadataProvider, connectionId);
+                        disconnectJobList.put(connectionId, p);
                         if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
-                                    + datasetName + " as dataset is being dropped");
+                            LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName()
+                                    + " from dataset " + datasetName + " as dataset is being dropped");
                         }
                         // prepare job to remove feed log storage
-                        jobsToExecute
-                                .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
-                                        connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName())));
+                        jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionId.getFeedId().getDataverse(),
+                                        connectionId.getFeedId().getEntityName())));
                     }
                 }
 
@@ -1555,23 +1530,22 @@ public class QueryTranslator extends AbstractLangTranslator {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
-
-            List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-            boolean resourceInUse = false;
-            if (feedConnections != null && !feedConnections.isEmpty()) {
-                StringBuilder builder = new StringBuilder();
-                for (FeedConnectionId connection : feedConnections) {
-                    if (connection.getDatasetName().equals(datasetName)) {
-                        resourceInUse = true;
-                        builder.append(connection + "\n");
+            IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            StringBuilder builder = null;
+            for (IActiveEntityEventsListener listener : listeners) {
+                if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+                    if (builder == null) {
+                        builder = new StringBuilder();
                     }
-                }
-                if (resourceInUse) {
-                    throw new AsterixException(
-                            "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
-                                    + builder.toString() + "\nOperation not supported.");
+                    builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
                 }
             }
+            if (builder != null) {
+                throw new AsterixException(
+                        "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
+                                + builder.toString() + "\nOperation not supported.");
+            }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 indexName = stmtIndexDrop.getIndexName().getValue();
@@ -1628,8 +1602,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                        datasetName);
+                List<Index> datasetIndexes =
+                        MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
                     // only one index + the files index, we need to delete both of the indexes
@@ -1846,11 +1820,11 @@ public class QueryTranslator extends AbstractLangTranslator {
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         try {
-            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName,
-                    loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
-                    loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = apiFramework.compileQuery(null, metadataProvider, null, 0, null, sessionConfig,
-                    cls);
+            CompiledLoadFromFileStatement cls =
+                    new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
+                            loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+            JobSpecification spec =
+                    apiFramework.compileQuery(null, metadataProvider, null, 0, null, sessionConfig, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1882,11 +1856,11 @@ public class QueryTranslator extends AbstractLangTranslator {
             metadataProvider.setWriteTransaction(true);
             CompiledInsertStatement clfrqs = null;
             switch (stmtInsertUpsert.getKind()) {
-                case INSERT:
+                case Statement.INSERT:
                     clfrqs = new CompiledInsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
                             query, stmtInsertUpsert.getVarCounter());
                     break;
-                case UPSERT:
+                case Statement.UPSERT:
                     clfrqs = new CompiledUpsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
                             query, stmtInsertUpsert.getVarCounter());
                     break;
@@ -1957,8 +1931,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
-        Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
-                sessionConfig);
+        Pair<Query, Integer> reWrittenQuery =
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig);
 
         // Query Compilation (happens under the same ongoing metadata transaction)
         JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
@@ -1989,13 +1963,13 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
 
             switch (stmt.getKind()) {
-                case CREATE_PRIMARY_FEED:
+                case Statement.CREATE_PRIMARY_FEED:
                     CreatePrimaryFeedStatement cpfs = (CreatePrimaryFeedStatement) stmt;
                     String adaptorName = cpfs.getAdaptorName();
                     feed = new Feed(dataverseName, feedName, cfs.getAppliedFunction(), FeedType.PRIMARY, feedName,
                             adaptorName, cpfs.getAdaptorConfiguration());
                     break;
-                case CREATE_SECONDARY_FEED:
+                case Statement.CREATE_SECONDARY_FEED:
                     CreateSecondaryFeedStatement csfs = (CreateSecondaryFeedStatement) stmt;
                     feed = new Feed(dataverseName, feedName, csfs.getAppliedFunction(), FeedType.SECONDARY,
                             csfs.getSourceFeedName(), null, null);
@@ -2026,8 +2000,8 @@ public class QueryTranslator extends AbstractLangTranslator {
         policy = cfps.getPolicyName();
         MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
         try {
-            FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
-                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
+            FeedPolicyEntity feedPolicy =
+                    MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
             if (feedPolicy != null) {
                 if (cfps.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2094,19 +2068,19 @@ public class QueryTranslator extends AbstractLangTranslator {
                 return;
             }
 
-            FeedId feedId = new FeedId(dataverseName, feedName);
-            List<FeedConnectionId> activeConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(feedId);
-            if (activeConnections != null && !activeConnections.isEmpty()) {
+            EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+            FeedEventsListener listener =
+                    (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId);
+            if (listener != null) {
                 StringBuilder builder = new StringBuilder();
-                for (FeedConnectionId connectionId : activeConnections) {
+                for (FeedConnectionId connectionId : listener.getConnections()) {
                     builder.append(connectionId.getDatasetName() + "\n");
                 }
-
                 throw new AlgebricksException("Feed " + feedId
                         + " is currently active and connected to the following dataset(s) \n" + builder.toString());
             } else {
                 JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(
-                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName()));
+                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
                 JobUtils.runJob(hcc, spec, true);
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
@@ -2160,61 +2134,62 @@ public class QueryTranslator extends AbstractLangTranslator {
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
-
         boolean bActiveTxn = true;
-
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         boolean subscriberRegistered = false;
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
-
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName());
+        FeedEventsListener listener =
+                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
         MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
                 dataverseName + "." + feedName);
         try {
             metadataProvider.setWriteTransaction(true);
-
             CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(),
                     cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
-
             FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
                     metadataProvider.getMetadataTxnContext());
-
             Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
                     metadataProvider.getMetadataTxnContext());
-
             feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
-
-            subscriberRegistered = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId, eventSubscriber);
+            if (listener != null) {
+                subscriberRegistered = listener.isFeedConnectionActive(feedConnId, eventSubscriber);
+            }
             if (subscriberRegistered) {
                 throw new AsterixException("Feed " + cfs.getFeedName() + " is already connected to dataset "
                         + cfs.getDatasetName().getValue());
             }
-
-            FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
-                    mdTxnCtx);
-
+            FeedPolicyEntity feedPolicy =
+                    FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
             // All Metadata checks have passed. Feed connect request is valid. //
-
+            if (listener == null) {
+                listener = new FeedEventsListener(entityId);
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
             FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
-            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName,
-                    feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
+            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple =
+                    getFeedConnectionRequest(dataverseName, feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
             FeedConnectionRequest connectionRequest = triple.first;
             boolean createFeedIntakeJob = triple.second;
-            FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
+            listener.registerFeedEventSubscriber(eventSubscriber);
             subscriberRegistered = true;
             if (createFeedIntakeJob) {
-                FeedId feedId = connectionRequest.getFeedJointKey().getFeedId();
-                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(),
-                        feedId.getFeedName());
-                Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
-                        metadataProvider, policyAccessor);
+                EntityId feedId = connectionRequest.getFeedJointKey().getFeedId();
+                Feed primaryFeed =
+                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
+                Pair<JobSpecification, IAdapterFactory> pair =
+                        FeedOperations.buildFeedIntakeJobSpec(primaryFeed, metadataProvider, policyAccessor);
                 // adapter configuration are valid at this stage
                 // register the feed joints (these are auto-de-registered)
                 int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
                 for (IFeedJoint fj : triple.third) {
-                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers);
+                    listener.registerFeedJoint(fj, numOfPrividers);
                 }
+                FeedIntakeInfo activeJob = new FeedIntakeInfo(null, ActivityState.ACTIVE, feed.getFeedId(),
+                        triple.third.get(0), pair.first);
+                pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
                 JobUtils.runJob(hcc, pair.first, false);
                 /*
                  * TODO: Fix record tracking
@@ -2227,7 +2202,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
-                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, 0);
+                    listener.registerFeedJoint(fj, 0);
                 }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2245,7 +2220,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
                     dataverseName + "." + feedName);
             if (subscriberRegistered) {
-                FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
+                listener.deregisterFeedEventSubscriber(eventSubscriber);
             }
         }
     }
@@ -2260,27 +2235,33 @@ public class QueryTranslator extends AbstractLangTranslator {
      * @param feedPolicy
      * @param mdTxnCtx
      * @return
-     * @throws MetadataException
+     * @throws AsterixException
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws MetadataException {
+            throws AsterixException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
         boolean needIntakeJob = false;
         List<IFeedJoint> jointsToRegister = new ArrayList<IFeedJoint>();
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), dataset);
-
         FeedRuntimeType connectionLocation = null;
         FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx);
-        boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverse, feed.getFeedName());
+        FeedEventsListener listener =
+                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+        if (listener == null) {
+            throw new AsterixException("Feed Listener is not registered");
+        }
+
+        boolean isFeedJointAvailable = listener.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
-            sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
+            sourceFeedJoint = listener.getAvailableFeedJoint(feedJointKey);
             if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = FeedRuntimeType.INTAKE;
-                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
-                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
+                EntityId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
+                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getEntityName());
                 FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<String>());
                 sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
                         FeedJointType.INTAKE, connectionId);
@@ -2306,7 +2287,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 jointsToRegister.add(computeFeedJoint);
             }
         } else {
-            sourceFeedJoint = FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
+            sourceFeedJoint = listener.getFeedJoint(feedJointKey);
             connectionLocation = sourceFeedJoint.getConnectionLocation();
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Feed joint " + sourceFeedJoint + " is available! need not apply any further computation");
@@ -2331,8 +2312,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             if (sourceFeed.getAppliedFunction() != null) {
                 appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
             }
-            Feed parentFeed = MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(),
-                    sourceFeed.getSourceFeedName());
+            Feed parentFeed =
+                    MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(), sourceFeed.getSourceFeedName());
             sourceFeed = parentFeed;
         }
 
@@ -2354,16 +2335,16 @@ public class QueryTranslator extends AbstractLangTranslator {
 
         FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
-
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
-        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId,
-                eventSubscriber);
-        if (!isFeedConnectionActive) {
-            throw new AsterixException("Feed " + feed.getFeedId().getFeedName() + " is currently not connected to "
+        FeedEventsListener listener =
+                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+        if (listener == null || !listener.isConnectedToDataset(datasetName)) {
+            throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
-
+        listener.registerFeedEventSubscriber(eventSubscriber);
         MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
                 dataverseName + "." + cfs.getFeedName());
         try {
@@ -2373,8 +2354,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 throw new AsterixException(
                         "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName);
             }
-            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations
-                    .buildDisconnectFeedJobSpec(metadataProvider, connectionId);
+            Pair<JobSpecification, Boolean> specDisconnectType =
+                    FeedOperations.buildDisconnectFeedJobSpec(metadataProvider, connectionId);
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2405,8 +2386,8 @@ public class QueryTranslator extends AbstractLangTranslator {
         SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
         bfs.initialize(metadataProvider.getMetadataTxnContext());
 
-        CompiledSubscribeFeedStatement csfs = new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
-                bfs.getVarCounter());
+        CompiledSubscribeFeedStatement csfs =
+                new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), bfs.getVarCounter());
         metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy());
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
@@ -2418,14 +2399,28 @@ public class QueryTranslator extends AbstractLangTranslator {
         String dataverse = feedConnectionId.getFeedId().getDataverse();
         String dataset = feedConnectionId.getDatasetName();
         MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
-                dataverse + "." + feedConnectionId.getFeedId().getFeedName());
+                dataverse + "." + feedConnectionId.getFeedId().getEntityName());
         try {
             JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(compiled, feedConnectionId,
                     bfs.getSubscriptionRequest().getPolicyParameters());
+            FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, bfs.getPolicy());
+            if (policy == null) {
+                policy = BuiltinFeedPolicies.getFeedPolicy(bfs.getPolicy());
+                if (policy == null) {
+                    throw new AlgebricksException("Unknown feed policy:" + bfs.getPolicy());
+                }
+            }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
             if (compiled != null) {
+                FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
+                        .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
+                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                        null, ActivityState.ACTIVE,
+                        new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
+                        listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties());
+                alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
                 JobUtils.runJob(hcc, alteredJobSpec, false);
             }
 
@@ -2437,7 +2432,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset,
-                    dataverse + "." + feedConnectionId.getFeedId().getFeedName());
+                    dataverse + "." + feedConnectionId.getFeedId().getEntityName());
         }
     }
 
@@ -2462,10 +2457,9 @@ public class QueryTranslator extends AbstractLangTranslator {
                     ds.getItemTypeDataverseName(), itemTypeName);
             ARecordType metaRecordType = null;
             if (ds.hasMetaPart()) {
-                metaRecordType = (ARecordType) MetadataManager.INSTANCE
-                        .getDatatype(metadataProvider.getMetadataTxnContext(), ds.getMetaItemTypeDataverseName(),
-                                ds.getMetaItemTypeName())
-                        .getDatatype();
+                metaRecordType =
+                        (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                                ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName()).getDatatype();
             }
             // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
@@ -2473,8 +2467,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 throw new AlgebricksException(
                         "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes");
             }
-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                    dataverseName);
+            Dataverse dataverse =
+                    MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
             ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, indexes);
@@ -2959,8 +2953,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                 throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
             }
             // Cleans up the sink dataset -- Drop and then Create.
-            DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
-                    true);
+            DropStatement dropStmt =
+                    new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(), true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
             IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(),
                     toIndex.getKeyFieldSourceIndicators(), false, null, toDataset.getDatasetDetails().isTemp());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 06ec6bc..324194d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -34,8 +34,8 @@ import org.apache.asterix.api.http.servlet.QueryStatusAPIServlet;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
+import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
-import org.apache.asterix.app.external.FeedLifecycleListener;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.AsterixExternalProperties;
@@ -101,7 +101,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
 
         AsterixAppContextInfo.getInstance().getCCApplicationContext()
-                .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+                .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
 
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
@@ -120,6 +120,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     @Override
     public void stop() throws Exception {
+        ActiveLifecycleListener.INSTANCE.stop();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopping Asterix cluster controller");
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 9dd4025..138b620 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -25,7 +25,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.app.external.FeedLifecycleListener;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
 import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
@@ -37,7 +38,6 @@ import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -81,8 +81,8 @@ public class CCMessageBroker implements ICCMessageBroker {
             case COMPLETE_FAILBACK_RESPONSE:
                 handleCompleteFailbcakResponse(message);
                 break;
-            case FEED_PROVIDER_READY:
-                handleFeedProviderReady(message);
+            case ACTIVE_ENTITY_MESSAGE:
+                handleActiveEntityMessage(message);
                 break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
@@ -90,9 +90,8 @@ public class CCMessageBroker implements ICCMessageBroker {
         }
     }
 
-    private void handleFeedProviderReady(IMessage message) {
-        FeedPartitionStartMessage msg = (FeedPartitionStartMessage) message;
-        FeedLifecycleListener.INSTANCE.notifyPartitionStart(msg.getFeedId(), msg.getJobId());
+    private void handleActiveEntityMessage(IMessage message) {
+        ActiveLifecycleListener.INSTANCE.receive((ActivePartitionMessage) message);
     }
 
     private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 0989545..6bddfa5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -36,7 +36,6 @@ import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.IQueryRewriter;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Statement.Kind;
 import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
@@ -129,7 +128,7 @@ public class ParserTestExecutor extends TestExecutor {
             when(aqlMetadataProvider.findDataset(anyString(), anyString())).thenReturn(mock(Dataset.class));
 
             for (Statement st : statements) {
-                if (st.getKind() == Kind.QUERY) {
+                if (st.getKind() == Statement.QUERY) {
                     Query query = (Query) st;
                     IQueryRewriter rewriter = sqlppRewriterFactory.createQueryRewriter();
                     rewrite(rewriter, functions, query, aqlMetadataProvider,
@@ -157,7 +156,7 @@ public class ParserTestExecutor extends TestExecutor {
     private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
         List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
         for (Statement st : statements) {
-            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
+            if (st.getKind() == Statement.FUNCTION_DECL) {
                 functionDecls.add((FunctionDecl) st);
             }
         }
@@ -167,7 +166,7 @@ public class ParserTestExecutor extends TestExecutor {
     // Gets the default dataverse for the input statements.
     private String getDefaultDataverse(List<Statement> statements) {
         for (Statement st : statements) {
-            if (st.getKind().equals(Statement.Kind.DATAVERSE_DECL)) {
+            if (st.getKind() == Statement.DATAVERSE_DECL) {
                 DataverseDecl dv = (DataverseDecl) st;
                 return dv.getDataverseName().getValue();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index fba74e8..7940680 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -36,7 +36,7 @@ public interface IApplicationMessage extends IMessage {
         COMPLETE_FAILBACK_REQUEST,
         COMPLETE_FAILBACK_RESPONSE,
         REPLICA_EVENT,
-        FEED_PROVIDER_READY
+        ACTIVE_ENTITY_MESSAGE
     }
 
     public abstract ApplicationMessageType getMessageType();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index e35146c..08a30e3 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -211,6 +211,21 @@
       <artifactId>algebricks-compiler</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-hdfs-core</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-active</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>com.kenai.nbpwr</groupId>
       <artifactId>org-apache-commons-io</artifactId>
       <version>1.3.1-201002241208</version>
@@ -255,21 +270,11 @@
       <version>1.0.1-modified-01</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-hdfs-core</artifactId>
-      <version>${hyracks.version}</version>
-    </dependency>
-    <dependency>
       <groupId>jdom</groupId>
       <artifactId>jdom</artifactId>
       <version>1.0</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-common</artifactId>
-      <version>0.8.9-SNAPSHOT</version>
-    </dependency>
-    <dependency>
       <groupId>com.microsoft.windowsazure</groupId>
       <artifactId>microsoft-windowsazure-api</artifactId>
       <version>0.4.4</version>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
index 0dd87d6..dac8fbb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -18,10 +18,10 @@
  */
 package org.apache.asterix.external.feed.api;
 
-import org.apache.asterix.external.feed.watch.FeedJobInfo;
+import org.apache.asterix.active.ActiveJob;
 
 public class FeedOperationCounter {
-    private FeedJobInfo feedJobInfo;
+    private ActiveJob feedJobInfo;
     private int partitionCount;
     private boolean failedIngestion = false;
 
@@ -45,11 +45,15 @@ public class FeedOperationCounter {
         this.failedIngestion = failedIngestion;
     }
 
-    public FeedJobInfo getFeedJobInfo() {
+    public ActiveJob getFeedJobInfo() {
         return feedJobInfo;
     }
 
-    public void setFeedJobInfo(FeedJobInfo feedJobInfo) {
+    public void setFeedJobInfo(ActiveJob feedJobInfo) {
         this.feedJobInfo = feedJobInfo;
     }
+
+    public int decrementAndGet() {
+        return --partitionCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
index 6865522..ce0b3d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java
@@ -21,8 +21,8 @@ package org.apache.asterix.external.feed.api;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.feed.management.FeedId;
 
 public interface IFeed extends Serializable {
 
@@ -46,7 +46,7 @@ public interface IFeed extends Serializable {
 
     public String getDataverseName();
 
-    public FeedId getFeedId();
+    public EntityId getFeedId();
 
     public Map<String, String> getAdapterConfiguration();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
deleted file mode 100644
index 503715b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedConnectionManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedRuntimeManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
-public interface IFeedConnectionManager {
-
-    /**
-     * Allows registration of a feedRuntime.
-     *
-     * @param feedRuntime
-     * @throws Exception
-     */
-    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime);
-
-    /**
-     * Obtain feed runtime corresponding to a feedRuntimeId
-     *
-     * @param feedRuntimeId
-     * @return
-     */
-    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
-    /**
-     * De-register a feed
-     *
-     * @param feedConnection
-     * @throws IOException
-     */
-    void deregisterFeed(FeedConnectionId feedConnection);
-
-    /**
-     * Obtain the feed runtime manager associated with a feed.
-     *
-     * @param feedConnection
-     * @return
-     */
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedConnection);
-
-    /**
-     * Allows de-registration of a feed runtime.
-     *
-     * @param feedRuntimeId
-     */
-    void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
-    public List<FeedRuntimeId> getRegisteredRuntimes();
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
index 477ec36..548be4a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedJoint.java
@@ -18,15 +18,16 @@
  */
 package org.apache.asterix.external.feed.api;
 
+import java.io.Serializable;
 import java.util.List;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.management.FeedJointKey;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 
-public interface IFeedJoint {
+public interface IFeedJoint extends Serializable {
 
     public enum FeedJointType {
         /** Feed Joint is located at the intake stage of a primary feed **/
@@ -100,7 +101,7 @@ public interface IFeedJoint {
      */
     public void removeReceiver(FeedConnectionId connectionId);
 
-    public FeedId getOwnerFeedId();
+    public EntityId getOwnerFeedId();
 
     /**
      * Add a feed connectionId to the set of registered subscribers

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
deleted file mode 100644
index 3302856..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.util.List;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.hyracks.api.job.IJobLifecycleListener;
-
-public interface IFeedLifecycleListener extends IJobLifecycleListener {
-    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJoinKey);
-
-    public boolean isFeedJointAvailable(FeedJointKey feedJoinKey);
-
-    public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId);
-
-    public List<String> getComputeLocations(FeedId feedId);
-
-    public List<String> getIntakeLocations(FeedId feedId);
-
-    public List<String> getStoreLocations(FeedConnectionId feedId);
-
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
-
-    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber);
-
-    public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
-
-    boolean isFeedConnectionActive(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber eventSubscriber);
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
deleted file mode 100644
index 15e2de6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.api.dataflow.value.JSONSerializable;
-
-/**
- * A control message exchanged between {@Link IFeedManager} and {@Link CentralFeedManager} that requests for an action or reporting of an event
- */
-public interface IFeedMessage extends Serializable, JSONSerializable {
-
-    public enum MessageType {
-        END
-    }
-
-    /**
-     * Gets the type associated with this message
-     *
-     * @return MessageType type associated with this message
-     */
-    public MessageType getMessageType();
-
-}


Mime
View raw message