asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [16/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:58 GMT
Add Asterix Extension Manager

More extension support is added. A user can now provide implementations
for the IExtension interface which will give them more control over
the behavior of the system and give them the ability to add custom
features.
Initial customizations include:
1. Metadata Tuple Translators
2. Metadata Datasets
3. Query Translators
4. Statement Handlers
5. Lang Compilation Provider

Change-Id: I280268495cc3aad00f898cba21f7299f7120ce5c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1017
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ab81748a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ab81748a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ab81748a

Branch: refs/heads/master
Commit: ab81748abce39d8a368d5069d96d96357bedfe6e
Parents: a4815d3
Author: Yingyi Bu <yingyi@couchbase.com>
Authored: Fri Aug 19 21:29:31 2016 -0700
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Fri Aug 19 23:15:04 2016 -0700

----------------------------------------------------------------------
 .../org/apache/asterix/active/ActiveEvent.java  |   32 +-
 .../active/ActiveJobNotificationHandler.java    |   51 +-
 .../apache/asterix/active/ActiveManager.java    |   41 +-
 .../asterix/active/ActivePartitionMessage.java  |   55 -
 .../apache/asterix/active/ActiveRuntime.java    |   39 -
 .../apache/asterix/active/ActiveRuntimeId.java  |   20 +-
 .../asterix/active/ActiveRuntimeManager.java    |    6 +-
 .../asterix/active/ActiveRuntimeRegistry.java   |   79 -
 .../ActiveSourceOperatorNodePushable.java       |  125 +
 .../apache/asterix/active/IActiveRuntime.java   |   15 +-
 .../asterix/active/IActiveRuntimeRegistry.java  |   49 -
 .../active/message/ActiveManagerMessage.java    |   55 +
 .../active/message/ActivePartitionMessage.java  |   68 +
 .../asterix/algebra/base/ILangExtension.java    |   64 +
 .../extension/ExtensionFunctionIdentifier.java  |   78 +
 .../extension/IAlgebraExtensionManager.java     |   34 +
 .../algebra/extension/IExtensionStatement.java  |   48 +
 .../asterix/optimizer/base/RuleCollections.java |    7 +-
 .../rules/MetaFunctionToMetaVariableRule.java   |   14 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   35 +-
 .../am/AbstractIntroduceAccessMethodRule.java   |  129 +-
 .../optimizer/rules/am/BTreeAccessMethod.java   |  120 +-
 .../rules/am/IntroduceJoinAccessMethodRule.java |    8 +-
 .../am/IntroduceLSMComponentFilterRule.java     |   34 +-
 .../am/IntroduceSelectAccessMethodRule.java     |    4 +-
 .../rules/am/InvertedIndexAccessMethod.java     |   85 +-
 .../rules/am/OptimizableOperatorSubTree.java    |  286 +-
 .../optimizer/rules/am/RTreeAccessMethod.java   |   62 +-
 .../translator/AbstractLangTranslator.java      |    6 +-
 .../AqlExpressionToPlanTranslator.java          |    2 +-
 .../AqlPlusExpressionToPlanTranslator.java      |    4 +-
 .../asterix/translator/IStatementExecutor.java  |  129 +
 .../translator/IStatementExecutorFactory.java   |   46 +
 .../LangExpressionToPlanTranslator.java         |  250 +-
 .../translator/util/PlanTranslationUtil.java    |    1 -
 .../asterix-algebra/src/main/javacc/AQLPlus.jj  |    2 +-
 .../apache/asterix/api/common/APIFramework.java |   23 +-
 .../api/common/AsterixAppRuntimeContext.java    |  465 ---
 ...rixAppRuntimeContextProviderForRecovery.java |    5 +-
 .../common/AsterixHyracksIntegrationUtil.java   |    6 +-
 .../asterix/api/common/SessionConfig.java       |  263 --
 .../asterix/api/http/servlet/APIServlet.java    |   42 +-
 .../asterix/api/http/servlet/AQLAPIServlet.java |   16 +-
 .../api/http/servlet/ClusterAPIServlet.java     |    4 +-
 .../api/http/servlet/ConnectorAPIServlet.java   |    8 +-
 .../asterix/api/http/servlet/DDLAPIServlet.java |   25 +-
 .../api/http/servlet/QueryAPIServlet.java       |   19 +-
 .../api/http/servlet/QueryResultAPIServlet.java |   17 +-
 .../api/http/servlet/QueryServiceServlet.java   |   39 +-
 .../api/http/servlet/QueryStatusAPIServlet.java |   15 +-
 .../api/http/servlet/RESTAPIServlet.java        |   36 +-
 .../api/http/servlet/UpdateAPIServlet.java      |   20 +-
 .../asterix/api/java/AsterixJavaClient.java     |   22 +-
 .../app/cc/CompilerExtensionManager.java        |  156 +
 .../app/cc/IStatementExecutorExtension.java     |   37 +
 .../app/external/ActiveLifecycleListener.java   |   15 +-
 .../app/external/ExternalLibraryUtils.java      |    8 +-
 .../app/external/FeedWorkCollection.java        |   15 +-
 .../app/nc/AsterixNCAppRuntimeContext.java      |  475 +++
 .../asterix/app/nc/NCExtensionManager.java      |  110 +
 .../asterix/app/result/ResultPrinter.java       |  190 ++
 .../apache/asterix/app/result/ResultReader.java |   64 +
 .../apache/asterix/app/result/ResultUtil.java   |  254 ++
 .../DefaultStatementExecutorFactory.java        |   43 +
 .../asterix/app/translator/QueryTranslator.java | 3122 ++++++++++++++++++
 .../asterix/aql/translator/QueryTranslator.java | 3107 -----------------
 .../org/apache/asterix/drivers/AsterixCLI.java  |    8 +-
 .../asterix/drivers/AsterixClientDriver.java    |    4 +-
 .../asterix/drivers/AsterixWebServer.java       |    8 +-
 .../bootstrap/CCApplicationEntryPoint.java      |   60 +-
 .../bootstrap/GlobalRecoveryManager.java        |   28 +-
 .../bootstrap/NCApplicationEntryPoint.java      |    5 +-
 .../asterix/messaging/CCMessageBroker.java      |    4 +-
 .../asterix/messaging/NCMessageBroker.java      |   21 +-
 .../apache/asterix/result/ResultPrinter.java    |  188 --
 .../org/apache/asterix/result/ResultReader.java |   65 -
 .../org/apache/asterix/result/ResultUtils.java  |  243 --
 .../app/bootstrap/TestNodeController.java       |   92 +-
 .../aql/translator/QueryTranslatorTest.java     |    6 +-
 .../org/apache/asterix/test/dml/DmlTest.java    |    6 +-
 .../asterix/test/optimizer/OptimizerTest.java   |    4 +-
 .../asterix/test/runtime/ExecutionTestUtil.java |   10 +-
 .../asterix/test/runtime/HDFSCluster.java       |   18 +-
 .../apache/asterix/common/api/ExtensionId.java  |   64 +
 .../common/api/IAsterixAppRuntimeContext.java   |    2 +-
 .../apache/asterix/common/api/IExtension.java   |   63 +
 .../asterix/common/app/SessionConfig.java       |  264 ++
 .../asterix/common/config/AsterixExtension.java |   41 +
 .../config/AsterixExtensionProperties.java      |   32 +
 .../common/config/AsterixProperties.java        |   54 +
 .../config/AsterixPropertiesAccessor.java       |  192 +-
 .../common/config/MetadataConstants.java        |   33 -
 .../asterix/common/exceptions/ErrorCode.java    |    2 +
 .../messaging/api/IApplicationMessage.java      |    3 +-
 .../apache/asterix/common/utils/ConfigUtil.java |   41 +
 .../src/main/resources/schema/asterix-conf.xsd  |   29 +
 .../external/feed/api/ISubscriberRuntime.java   |   27 -
 .../feed/dataflow/FeedRuntimeInputHandler.java  |    2 +-
 .../feed/management/FeedEventsListener.java     |    4 +-
 .../external/feed/runtime/AdapterExecutor.java  |    9 +-
 .../feed/runtime/AdapterRuntimeManager.java     |   22 +-
 .../feed/runtime/CollectionRuntime.java         |   27 +-
 .../external/feed/runtime/IngestionRuntime.java |   20 +-
 .../feed/runtime/SubscribableRuntime.java       |   17 +-
 .../asterix/external/indexing/ExternalFile.java |   23 +-
 .../FeedCollectOperatorDescriptor.java          |    4 +-
 .../FeedCollectOperatorNodePushable.java        |   19 +-
 .../FeedIntakeOperatorNodePushable.java         |    8 +-
 .../FeedMessageOperatorNodePushable.java        |   13 +-
 .../operators/FeedMetaComputeNodePushable.java  |   23 +-
 .../operators/FeedMetaStoreNodePushable.java    |   33 +-
 .../lang/aql/rewrites/AqlQueryRewriter.java     |   10 +-
 .../aql/statement/SubscribeFeedStatement.java   |    5 +
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |    6 +-
 .../asterix/lang/common/base/Expression.java    |    1 -
 .../asterix/lang/common/base/Statement.java     |   23 +
 .../lang/common/statement/CompactStatement.java |    5 +
 .../common/statement/ConnectFeedStatement.java  |    5 +
 .../statement/CreateDataverseStatement.java     |    5 +
 .../statement/CreateFeedPolicyStatement.java    |    5 +
 .../common/statement/CreateFeedStatement.java   |    4 +
 .../statement/CreateFunctionStatement.java      |    5 +
 .../common/statement/CreateIndexStatement.java  |    5 +
 .../lang/common/statement/DatasetDecl.java      |    7 +-
 .../lang/common/statement/DataverseDecl.java    |    5 +
 .../statement/DataverseDropStatement.java       |    5 +
 .../lang/common/statement/DeleteStatement.java  |    5 +
 .../statement/DisconnectFeedStatement.java      |    5 +
 .../common/statement/DropDatasetStatement.java  |   65 +
 .../lang/common/statement/DropStatement.java    |   60 -
 .../common/statement/FeedDropStatement.java     |    5 +
 .../statement/FeedPolicyDropStatement.java      |    5 +
 .../lang/common/statement/FunctionDecl.java     |    5 +
 .../common/statement/FunctionDropStatement.java |    5 +
 .../common/statement/IndexDropStatement.java    |    5 +
 .../lang/common/statement/InsertStatement.java  |    5 +
 .../lang/common/statement/LoadStatement.java    |    5 +
 .../statement/NodeGroupDropStatement.java       |    5 +
 .../lang/common/statement/NodegroupDecl.java    |    5 +
 .../asterix/lang/common/statement/Query.java    |    5 +
 .../RefreshExternalDatasetStatement.java        |    5 +
 .../lang/common/statement/RunStatement.java     |    5 +
 .../lang/common/statement/SetStatement.java     |    5 +
 .../asterix/lang/common/statement/TypeDecl.java |    5 +
 .../common/statement/TypeDropStatement.java     |    5 +
 .../lang/common/statement/UpdateStatement.java  |    5 +
 .../lang/common/statement/WriteStatement.java   |    5 +
 .../lang/common/util/LangRecordParseUtil.java   |  168 +
 .../lang/common/visitor/FormatPrintVisitor.java |    6 +-
 .../base/AbstractQueryExpressionVisitor.java    |    4 +-
 .../lang/common/visitor/base/ILangVisitor.java  |    4 +-
 .../SqlppBuiltinFunctionRewriteVisitor.java     |    2 +-
 .../visitor/VariableCheckAndRewriteVisitor.java |    2 +-
 .../AbstractSqlppExpressionScopingVisitor.java  |    2 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |   87 +-
 .../pom.xml                                     |  208 +-
 .../extension/grammar/GrammarExtensionMojo.java |  139 +-
 .../asterix/lang/extension/EchoStatement.java   |    5 +
 .../src/test/resources/lang/extension.jj        |    5 +
 .../basic-test/basic-test-plugin-config.xml     |  158 +-
 .../asterix/metadata/MetadataManager.java       |   36 +
 .../apache/asterix/metadata/MetadataNode.java   |  340 +-
 .../metadata/api/ExtensionMetadataDataset.java  |   51 +
 .../api/ExtensionMetadataDatasetId.java         |   62 +
 .../metadata/api/IExtensionMetadataEntity.java  |   34 +
 .../api/IExtensionMetadataSearchKey.java        |   43 +
 .../asterix/metadata/api/IMetadataEntity.java   |    3 +-
 .../api/IMetadataEntityTupleTranslator.java     |    3 +-
 .../metadata/api/IMetadataExtension.java        |   61 +
 .../asterix/metadata/api/IMetadataIndex.java    |    4 +-
 .../asterix/metadata/api/IMetadataManager.java  |  183 +-
 .../asterix/metadata/api/IMetadataNode.java     |  155 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  148 +-
 .../metadata/bootstrap/MetadataIndex.java       |   18 +-
 .../MetadataIndexImmutableProperties.java       |   44 +-
 .../bootstrap/MetadataPrimaryIndexes.java       |  195 +-
 .../metadata/bootstrap/MetadataRecordTypes.java |    4 +-
 .../metadata/declared/AqlDataSource.java        |   13 +
 .../metadata/declared/AqlMetadataProvider.java  |  467 ++-
 .../metadata/declared/DatasetDataSource.java    |   72 +-
 .../metadata/declared/FeedDataSource.java       |   65 +-
 .../metadata/declared/IMutationDataSource.java  |   34 +
 .../metadata/declared/LoadableDataSource.java   |   32 +
 .../asterix/metadata/entities/Dataset.java      |   21 +-
 .../asterix/metadata/entities/Dataverse.java    |    5 +
 .../entities/InternalDatasetDetails.java        |   16 +-
 .../AbstractTupleTranslator.java                |    9 +-
 .../CompactionPolicyTupleTranslator.java        |    4 +-
 .../DatasetTupleTranslator.java                 |   41 +-
 .../DatasourceAdapterTupleTranslator.java       |    9 +-
 .../DatatypeTupleTranslator.java                |   24 +-
 .../DataverseTupleTranslator.java               |    4 +-
 .../ExternalFileTupleTranslator.java            |   10 +-
 .../FeedPolicyTupleTranslator.java              |    6 +-
 .../FeedTupleTranslator.java                    |   19 +-
 .../FunctionTupleTranslator.java                |   19 +-
 .../IndexTupleTranslator.java                   |   20 +-
 .../LibraryTupleTranslator.java                 |   14 +-
 .../MetadataTupleTranslatorProvider.java        |   78 +
 .../NodeGroupTupleTranslator.java               |    9 +-
 .../NodeTupleTranslator.java                    |    4 +-
 .../metadata/feeds/BuiltinFeedPolicies.java     |    5 +-
 .../metadata/feeds/FeedMetadataUtil.java        |  161 +-
 .../metadata/feeds/LocationConstraint.java      |   29 +
 .../asterix/metadata/utils/DatasetUtils.java    |    5 +-
 .../metadata/utils/MetadataConstants.java       |   35 +
 .../utils/SplitsAndConstraintsUtil.java         |   42 +-
 .../DatasetTupleTranslatorTest.java             |    9 +-
 .../IndexTupleTranslatorTest.java               |    9 +-
 .../org/apache/asterix/om/base/AString.java     |   10 +-
 .../apache/asterix/om/types/ARecordType.java    |   14 +-
 .../asterix/om/util/AsterixAppContextInfo.java  |   24 +-
 ...tiTransactionJobletEventListenerFactory.java |   84 +
 .../server/test/NCServiceExecutionIT.java       |   14 +-
 asterixdb/pom.xml                               |   13 +
 .../algebra/functions/FunctionIdentifier.java   |    3 +-
 .../api/application/IApplicationConfig.java     |   13 +
 .../apache/hyracks/api/dataflow/ActivityId.java |   11 +-
 .../api/dataflow/IConnectorDescriptor.java      |    7 +-
 .../api/dataflow/IOperatorDescriptor.java       |   12 +-
 .../hyracks/api/exceptions/ErrorCode.java       |    1 +
 .../hyracks/api/job/JobSpecification.java       |   35 +-
 .../application/IniApplicationConfig.java       |   31 +-
 .../control/common/controllers/IniUtils.java    |   23 +-
 .../std/base/AbstractConnectorDescriptor.java   |   15 +-
 .../std/base/AbstractOperatorDescriptor.java    |   14 +-
 ...bstractSingleActivityOperatorDescriptor.java |    9 +
 ...wareMToNPartitioningConnectorDescriptor.java |    6 +-
 .../MToNBroadcastConnectorDescriptor.java       |    7 +-
 .../MToNPartitioningConnectorDescriptor.java    |    2 +-
 ...NPartitioningMergingConnectorDescriptor.java |    8 +-
 .../connectors/OneToOneConnectorDescriptor.java |    2 +-
 .../std/misc/SplitOperatorDescriptor.java       |   18 +-
 233 files changed, 9719 insertions(+), 6982 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index e5ccd05..c907a36 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -18,16 +18,14 @@
  */
 package org.apache.asterix.active;
 
-import java.io.Serializable;
-
 import org.apache.hyracks.api.job.JobId;
 
 public class ActiveEvent {
 
     private final JobId jobId;
     private final EntityId entityId;
-    private final Serializable payload;
     private final EventKind eventKind;
+    private final Object eventObject;
 
     public enum EventKind {
         JOB_START,
@@ -35,34 +33,30 @@ public class ActiveEvent {
         PARTITION_EVENT
     }
 
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind) {
-        this(jobId, eventKind, null, null);
-    }
-
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId) {
-        this(jobId, eventKind, feedId, null);
-    }
-
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId, Serializable payload) {
+    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId entityId, Object eventObject) {
         this.jobId = jobId;
+        this.entityId = entityId;
         this.eventKind = eventKind;
-        this.entityId = feedId;
-        this.payload = payload;
+        this.eventObject = eventObject;
+    }
+
+    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId entityId) {
+        this(jobId, eventKind, entityId, null);
     }
 
     public JobId getJobId() {
         return jobId;
     }
 
-    public EntityId getFeedId() {
+    public EntityId getEntityId() {
         return entityId;
     }
 
-    public Serializable getPayload() {
-        return payload;
-    }
-
     public EventKind getEventKind() {
         return eventKind;
     }
+
+    public Object getEventObject() {
+        return eventObject;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 9c69aca..30a2eb6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -55,8 +55,8 @@ public class ActiveJobNotificationHandler implements Runnable {
                 EntityId entityId = jobInfo.getEntityId();
                 IActiveEntityEventsListener listener = entityEventListener.get(entityId);
                 if (DEBUG) {
-                    LOGGER.log(Level.INFO, "Next event is of type " + event.getEventKind());
-                    LOGGER.log(Level.INFO, "Notifying the listener");
+                    LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind());
+                    LOGGER.log(Level.WARNING, "Notifying the listener");
                 }
                 listener.notify(event);
                 if (event.getEventKind() == EventKind.JOB_FINISH) {
@@ -74,7 +74,7 @@ public class ActiveJobNotificationHandler implements Runnable {
 
     private void removeFinishedJob(JobId jobId) {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "Removing the job");
+            LOGGER.log(Level.WARNING, "Removing the job");
         }
         jobId2ActiveJobInfos.remove(jobId);
     }
@@ -82,7 +82,7 @@ public class ActiveJobNotificationHandler implements Runnable {
     private void removeInactiveListener(IActiveEntityEventsListener listener) {
         if (!listener.isEntityActive()) {
             if (DEBUG) {
-                LOGGER.log(Level.INFO, "Removing the listener since it is not active anymore");
+                LOGGER.log(Level.WARNING, "Removing the listener since it is not active anymore");
             }
             entityEventListener.remove(listener.getEntityId());
         }
@@ -90,40 +90,45 @@ public class ActiveJobNotificationHandler implements Runnable {
 
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
             IActiveEntityEventsListener listener = entityEventListener.get(entityId);
-            LOGGER.log(Level.INFO, "Listener found: " + listener);
+            LOGGER.log(Level.WARNING, "Listener found: " + listener);
         }
         return entityEventListener.get(entityId);
     }
 
     public synchronized ActiveJob[] getActiveJobs() {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "getActiveJobs()  was called");
-            LOGGER.log(Level.INFO, "Number of jobs found: " + jobId2ActiveJobInfos.size());
+            LOGGER.log(Level.WARNING, "getActiveJobs()  was called");
+            LOGGER.log(Level.WARNING, "Number of jobs found: " + jobId2ActiveJobInfos.size());
         }
         return jobId2ActiveJobInfos.values().toArray(new ActiveJob[jobId2ActiveJobInfos.size()]);
     }
 
     public boolean isActiveJob(JobId jobId) {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "isActiveJob(JobId jobId) called with jobId: " + jobId);
+            LOGGER.log(Level.WARNING, "isActiveJob(JobId jobId) called with jobId: " + jobId);
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
         }
         return jobId2ActiveJobInfos.get(jobId) != null;
     }
 
+    public EntityId getEntity(JobId jobId) {
+        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
+        return jobInfo == null ? null : jobInfo.getEntityId();
+    }
+
     public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
         if (DEBUG) {
-            LOGGER.log(Level.INFO,
+            LOGGER.log(Level.WARNING,
                     "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
                             + jobId);
         }
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (property == null || !(property instanceof ActiveJob)) {
             if (DEBUG) {
-                LOGGER.log(Level.INFO, "Job was is not active. property found to be: " + property);
+                LOGGER.log(Level.WARNING, "Job was is not active. property found to be: " + property);
             }
             return;
         } else {
@@ -131,7 +136,7 @@ public class ActiveJobNotificationHandler implements Runnable {
         }
         if (DEBUG) {
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
         }
         ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
         if (jobInfo != null) {
@@ -139,11 +144,12 @@ public class ActiveJobNotificationHandler implements Runnable {
             IActiveEntityEventsListener listener = entityEventListener.get(entityId);
             listener.notifyJobCreation(jobId, jobSpecification);
             if (DEBUG) {
-                LOGGER.log(Level.INFO, "Listener was notified" + jobId);
+                LOGGER.log(Level.WARNING, "Listener was notified" + jobId);
             }
         } else {
             if (DEBUG) {
-                LOGGER.log(Level.INFO, "Listener was not notified since it was not registered for the job " + jobId);
+                LOGGER.log(Level.WARNING,
+                        "Listener was not notified since it was not registered for the job " + jobId);
             }
         }
     }
@@ -154,16 +160,17 @@ public class ActiveJobNotificationHandler implements Runnable {
 
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "getEventListeners() was called");
-            LOGGER.log(Level.INFO, "returning " + entityEventListener.size() + " Listeners");
+            LOGGER.log(Level.WARNING, "getEventListeners() was called");
+            LOGGER.log(Level.WARNING, "returning " + entityEventListener.size() + " Listeners");
         }
         return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
     }
 
     public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
-                    + listener.getEntityId());
+            LOGGER.log(Level.WARNING,
+                    "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+                            + listener.getEntityId());
         }
         if (entityEventListener.containsKey(listener.getEntityId())) {
             throw new HyracksDataException(
@@ -174,9 +181,9 @@ public class ActiveJobNotificationHandler implements Runnable {
 
     public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
         if (DEBUG) {
-            LOGGER.log(Level.INFO, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
         }
         if (entityEventListener.containsKey(activeJob.getEntityId())) {
             if (jobId2ActiveJobInfos.containsKey(jobId)) {
@@ -184,7 +191,7 @@ public class ActiveJobNotificationHandler implements Runnable {
                 return;
             }
             if (DEBUG) {
-                LOGGER.log(Level.INFO, "monitoring started for job id: " + jobId);
+                LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId);
             }
             jobId2ActiveJobInfos.put(jobId, activeJob);
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 5992294..2f34465 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -21,29 +21,23 @@ package org.apache.asterix.active;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
 
 public class ActiveManager {
 
+    private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
     private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
-
-    private final IActiveRuntimeRegistry activeRuntimeRegistry;
-
     private final ConcurrentFramePool activeFramePool;
-
     private final String nodeId;
 
     public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
         this.nodeId = nodeId;
-        this.activeRuntimeRegistry = new ActiveRuntimeRegistry(nodeId);
         this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
         this.runtimes = new ConcurrentHashMap<>();
     }
 
-    public IActiveRuntimeRegistry getActiveRuntimeRegistry() {
-        return activeRuntimeRegistry;
-    }
-
     public ConcurrentFramePool getFramePool() {
         return activeFramePool;
     }
@@ -59,12 +53,37 @@ public class ActiveManager {
         runtimes.remove(id);
     }
 
-    public IActiveRuntime getSubscribableRuntime(ActiveRuntimeId subscribableRuntimeId) {
-        return runtimes.get(subscribableRuntimeId);
+    public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
+        return runtimes.get(runtimeId);
     }
 
     @Override
     public String toString() {
         return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
     }
+
+    public void submit(ActiveManagerMessage message) {
+        switch (message.getKind()) {
+            case ActiveManagerMessage.STOP_ACTIVITY:
+                stopRuntime(message);
+                break;
+            default:
+                LOGGER.warn("Unknown message type received");
+        }
+    }
+
+    private void stopRuntime(ActiveManagerMessage message) {
+        ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+        IActiveRuntime runtime = runtimes.get(runtimeId);
+        if (runtime == null) {
+            LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
+        } else {
+            try {
+                runtime.stop();
+            } catch (HyracksDataException | InterruptedException e) {
+                // TODO(till) Figure out a better way to handle failure to stop a runtime
+                LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
deleted file mode 100644
index 8875647..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.Serializable;
-
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.hyracks.api.job.JobId;
-
-public class ActivePartitionMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final EntityId feedId;
-    private final JobId jobId;
-    private final Serializable payload;
-
-    public ActivePartitionMessage(EntityId feedId, JobId jobId, Serializable payload) {
-        this.feedId = feedId;
-        this.jobId = jobId;
-        this.payload = payload;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.ACTIVE_ENTITY_MESSAGE;
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public Serializable getPayload() {
-        return payload;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
deleted file mode 100644
index 8b0914a..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-public class ActiveRuntime implements IActiveRuntime {
-
-    /** A unique identifier for the runtime **/
-    protected final ActiveRuntimeId runtimeId;
-
-    public ActiveRuntime(ActiveRuntimeId runtimeId) {
-        this.runtimeId = runtimeId;;
-    }
-
-    @Override
-    public ActiveRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    @Override
-    public String toString() {
-        return runtimeId.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
index 64926fd..f1f5876 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
@@ -25,20 +25,20 @@ public class ActiveRuntimeId implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final EntityId entityId;
-    private final String runtimeId;
+    private final String runtimeName;
     private final int partition;
     private final int hashCode;
 
-    public ActiveRuntimeId(EntityId entityId, String runtimeId, int partition) {
+    public ActiveRuntimeId(EntityId entityId, String runtimeName, int partition) {
         this.entityId = entityId;
-        this.runtimeId = runtimeId;
+        this.runtimeName = runtimeName;
         this.partition = partition;
         this.hashCode = toString().hashCode();
     }
 
     @Override
     public String toString() {
-        return "(" + entityId + ")" + "[" + partition + "]:" + runtimeId;
+        return "(" + entityId + ")" + "[" + partition + "]:" + runtimeName;
     }
 
     @Override
@@ -50,7 +50,7 @@ public class ActiveRuntimeId implements Serializable {
             return false;
         }
         ActiveRuntimeId other = (ActiveRuntimeId) o;
-        return other.entityId.equals(entityId) && other.getFeedRuntimeType().equals(runtimeId)
+        return other.entityId.equals(entityId) && other.getRuntimeName().equals(runtimeName)
                 && other.getPartition() == partition;
     }
 
@@ -59,19 +59,15 @@ public class ActiveRuntimeId implements Serializable {
         return hashCode;
     }
 
-    public String getFeedRuntimeType() {
-        return runtimeId;
+    public String getRuntimeName() {
+        return runtimeName;
     }
 
     public int getPartition() {
         return partition;
     }
 
-    public String getRuntimeType() {
-        return runtimeId;
-    }
-
-    public EntityId getFeedId() {
+    public EntityId getEntityId() {
         return entityId;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
index 9743856..e71367a 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
@@ -31,7 +31,7 @@ import java.util.logging.Logger;
 public class ActiveRuntimeManager {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
-    private final Map<ActiveRuntimeId, ActiveRuntime> activeRuntimes;
+    private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes;
 
     private final ExecutorService executorService;
 
@@ -61,11 +61,11 @@ public class ActiveRuntimeManager {
         }
     }
 
-    public ActiveRuntime getFeedRuntime(ActiveRuntimeId runtimeId) {
+    public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) {
         return activeRuntimes.get(runtimeId);
     }
 
-    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveRuntime feedRuntime) {
+    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime) {
         activeRuntimes.put(runtimeId, feedRuntime);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
deleted file mode 100644
index 050426c..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * An implementation of the {@code IActiveRuntimeRegistry} interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with an active entity.
- */
-public class ActiveRuntimeRegistry implements IActiveRuntimeRegistry {
-
-    private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeRegistry.class.getName());
-
-    private Map<ActiveRuntimeId, ActiveRuntimeManager> activeRuntimeManagers = new HashMap<>();
-    private final String nodeId;
-
-    public ActiveRuntimeRegistry(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public void deregisterRuntime(ActiveRuntimeId runtimeId) {
-        try {
-            ActiveRuntimeManager mgr = activeRuntimeManagers.get(runtimeId);
-            if (mgr != null) {
-                mgr.deregisterRuntime(runtimeId);
-                mgr.close();
-                activeRuntimeManagers.remove(runtimeId);
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "Exception in closing feed runtime" + e.getMessage(), e);
-            }
-        }
-
-    }
-
-    @Override
-    public synchronized void registerRuntime(ActiveRuntime runtime) {
-        ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtime.getRuntimeId());
-        if (runtimeMgr == null) {
-            runtimeMgr = new ActiveRuntimeManager();
-            activeRuntimeManagers.put(runtime.getRuntimeId(), runtimeMgr);
-        }
-        runtimeMgr.registerRuntime(runtime.getRuntimeId(), runtime);
-    }
-
-    @Override
-    public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
-        ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtimeId);
-        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(runtimeId) : null;
-    }
-
-    @Override
-    public String toString() {
-        return ActiveRuntimeRegistry.class.getSimpleName() + "[" + nodeId + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
new file mode 100644
index 0000000..1cda298
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+
+public abstract class ActiveSourceOperatorNodePushable extends AbstractOperatorNodePushable implements IActiveRuntime {
+
+    protected final IHyracksTaskContext ctx;
+    protected final ActiveManager activeManager;
+    /** A unique identifier for the runtime **/
+    protected final ActiveRuntimeId runtimeId;
+    private volatile boolean done = false;
+
+    public ActiveSourceOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId) {
+        this.ctx = ctx;
+        activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getActiveManager();
+        this.runtimeId = runtimeId;
+    }
+
+    @Override
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    /**
+     * Starts the active job. This method must not return until the job has finished
+     *
+     * @throws HyracksDataException
+     * @throws InterruptedException
+     */
+    protected abstract void start() throws HyracksDataException, InterruptedException;
+
+    @Override
+    public final void stop() throws HyracksDataException, InterruptedException {
+        try {
+            abort();
+        } finally {
+            if (!done) {
+                synchronized (this) {
+                    while (!done) {
+                        wait();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * called from a different thread. This method stops the active node and force the start() call to return
+     *
+     * @throws HyracksDataException
+     * @throws InterruptedException
+     */
+    protected abstract void abort() throws HyracksDataException, InterruptedException;
+
+    @Override
+    public String toString() {
+        return runtimeId.toString();
+    }
+
+    @Override
+    public final void initialize() throws HyracksDataException {
+        activeManager.registerRuntime(this);
+        try {
+            // notify cc that runtime has been registered
+            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
+                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
+            start();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            synchronized (this) {
+                done = true;
+                notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public final void deinitialize() throws HyracksDataException {
+        activeManager.deregisterRuntime(runtimeId);
+        try {
+            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
+                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public final int getInputArity() {
+        return 0;
+    }
+
+    @Override
+    public final IFrameWriter getInputFrameWriter(int index) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 32c5c50..528c220 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -18,11 +18,20 @@
  */
 package org.apache.asterix.active;
 
-@FunctionalInterface
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public interface IActiveRuntime {
 
     /**
-     * @return the unique runtime id associated with the feedRuntime
+     * @return the unique runtime id associated with the active runtime
+     */
+    ActiveRuntimeId getRuntimeId();
+
+    /**
+     * Stops the running activity
+     *
+     * @throws HyracksDataException
+     * @throws InterruptedException
      */
-    public ActiveRuntimeId getRuntimeId();
+    void stop() throws HyracksDataException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
deleted file mode 100644
index b2c6f8e..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-
-public interface IActiveRuntimeRegistry {
-
-    /**
-     * Allows registration of a runtime.
-     *
-     * @param runtime
-     * @throws Exception
-     */
-    public void registerRuntime(ActiveRuntime runtime);
-
-    /**
-     * Obtain runtime corresponding to a feedRuntimeId
-     *
-     * @param runtimeId
-     * @return
-     */
-    public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId);
-
-    /**
-     * De-register a feed
-     *
-     * @param feedConnection
-     * @throws IOException
-     */
-    void deregisterRuntime(ActiveRuntimeId runtimeId);
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
new file mode 100644
index 0000000..a6e1788
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+
+public class ActiveManagerMessage extends AbstractApplicationMessage {
+    public static final byte STOP_ACTIVITY = 0x00;
+
+    private static final long serialVersionUID = 1L;
+    private final byte kind;
+    private final String src;
+    private final Serializable payload;
+
+    public ActiveManagerMessage(byte kind, String src, Serializable payload) {
+        this.kind = kind;
+        this.src = src;
+        this.payload = payload;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.ACTIVE_MANAGER_MESSAGE;
+    }
+
+    public Serializable getPayload() {
+        return payload;
+    }
+
+    public byte getKind() {
+        return kind;
+    }
+
+    public String getSrc() {
+        return src;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
new file mode 100644
index 0000000..f5bdf39
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.job.JobId;
+
+public class ActivePartitionMessage extends AbstractApplicationMessage {
+
+    public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
+    public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
+    private static final long serialVersionUID = 1L;
+    private final ActiveRuntimeId activeRuntimeId;
+    private final JobId jobId;
+    private final Serializable payload;
+    private final byte event;
+
+    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event) {
+        this(activeRuntimeId, jobId, event, null);
+    }
+
+    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) {
+        this.activeRuntimeId = activeRuntimeId;
+        this.jobId = jobId;
+        this.event = event;
+        this.payload = payload;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.ACTIVE_ENTITY_TO_CC_MESSAGE;
+    }
+
+    public ActiveRuntimeId getActiveRuntimeId() {
+        return activeRuntimeId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public Serializable getPayload() {
+        return payload;
+    }
+
+    public byte getEvent() {
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExtension.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExtension.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExtension.java
new file mode 100644
index 0000000..ac3bc03
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExtension.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.algebra.base;
+
+import org.apache.asterix.common.api.IExtension;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+/**
+ * An interface for language extensions
+ */
+public interface ILangExtension extends IExtension {
+
+    public enum Language {
+        AQL,
+        SQLPP
+    }
+
+    @Override
+    default ExtensionKind getExtensionKind() {
+        return ExtensionKind.LANG;
+    }
+
+    ILangCompilationProvider getLangCompilationProvider(Language lang);
+
+    //TODO(amoudi/yingyi) this is not a good way to extend re-write rules. introduce rewrite-rule-provider.
+    /**
+     * Called by the compiler when the unnest function is an extension function.
+     * Provides a way to add additional types of datasources
+     *
+     * @param opRef
+     * @param context
+     * @param unnestOp
+     * @param unnestExpr
+     * @param functionCallExpr
+     * @return true if transformation was successful, false otherwise
+     * @throws AlgebricksException
+     */
+    boolean unnestToDataScan(Mutable<ILogicalOperator> opRef, IOptimizationContext context, UnnestOperator unnestOp,
+            ILogicalExpression unnestExpr, AbstractFunctionCallExpression functionCallExpr) throws AlgebricksException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionFunctionIdentifier.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionFunctionIdentifier.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionFunctionIdentifier.java
new file mode 100644
index 0000000..ae67337
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionFunctionIdentifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.extension;
+
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+/**
+ * an Identifier for an extension function
+ */
+public class ExtensionFunctionIdentifier extends FunctionIdentifier {
+    //TODO (till): remove this classs
+
+    private static final long serialVersionUID = 1L;
+    private final ExtensionId extensionId;
+
+    /**
+     * Create an identifier for an external function
+     *
+     * @param namespace
+     * @param name
+     * @param extensionId
+     */
+    public ExtensionFunctionIdentifier(String namespace, String name, ExtensionId extensionId) {
+        super(namespace, name);
+        this.extensionId = extensionId;
+    }
+
+    /**
+     * Create an identifier for an external function
+     *
+     * @param namespace
+     * @param name
+     * @param arity
+     * @param extensionId
+     */
+    public ExtensionFunctionIdentifier(String namespace, String name, int arity, ExtensionId extensionId) {
+        super(namespace, name, arity);
+        this.extensionId = extensionId;
+    }
+
+    public ExtensionId getExtensionId() {
+        return extensionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() + extensionId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof ExtensionFunctionIdentifier) {
+            ExtensionFunctionIdentifier oId = (ExtensionFunctionIdentifier) o;
+            return super.equals(o) && extensionId.equals(oId.getExtensionId());
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IAlgebraExtensionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IAlgebraExtensionManager.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IAlgebraExtensionManager.java
new file mode 100644
index 0000000..0a8402c
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IAlgebraExtensionManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.extension;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public interface IAlgebraExtensionManager {
+
+    boolean unnestToDataScan(Mutable<ILogicalOperator> opRef, IOptimizationContext context, UnnestOperator unnestOp,
+            ILogicalExpression unnestExpr, AbstractFunctionCallExpression functionCallExpr) throws AlgebricksException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
new file mode 100644
index 0000000..9e2241f
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.extension;
+
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+/**
+ * An interface that provides an extension mechanism to extend a language with additional statements
+ */
+public interface IExtensionStatement extends Statement {
+
+    @Override
+    default byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    /**
+     * Called when the {@code IQueryTranslator} encounters an extension statement.
+     * An implementation class should implement the actual processing of the statement in this method.
+     *
+     * @param queryTranslator
+     * @param metadataProvider
+     * @param statementExecutor
+     * @param hcc
+     * @throws Exception
+     */
+    void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+            IHyracksClientConnection hcc) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 720de76..b29f743 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.base;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
 import org.apache.asterix.optimizer.rules.AddEquivalenceClassForRecordConstructorRule;
 import org.apache.asterix.optimizer.rules.AsterixExtractFunctionsFromJoinConditionRule;
 import org.apache.asterix.optimizer.rules.AsterixInlineVariablesRule;
@@ -152,7 +153,9 @@ public final class RuleCollections {
         return autogen;
     }
 
-    public static final List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
+    //TODO(amoudi/yingyi): refactor this to use a provider instead of passing the extensionManager
+    public static final List<IAlgebraicRewriteRule> buildNormalizationRuleCollection(
+            IAlgebraExtensionManager algebraExtensionManager) {
         List<IAlgebraicRewriteRule> normalization = new LinkedList<>();
         normalization.add(new ResolveVariableRule());
         normalization.add(new IntroduceUnnestForCollectionToSequenceRule());
@@ -174,7 +177,7 @@ public final class RuleCollections {
         normalization.add(new ExtractCommonExpressionsRule());
         normalization.add(new ConstantFoldingRule());
         normalization.add(new RemoveRedundantSelectRule());
-        normalization.add(new UnnestToDataScanRule());
+        normalization.add(new UnnestToDataScanRule(algebraExtensionManager));
         normalization.add(new MetaFunctionToMetaVariableRule());
         normalization.add(new FuzzyEqRule());
         normalization.add(new SimilarityCheckRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index 605ddb4..1fa7730 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
-import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.IMutationDataSource;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.IAType;
@@ -86,11 +86,11 @@ public class MetaFunctionToMetaVariableRule implements IAlgebraicRewriteRule {
             LogicalVariable metaVar = dataSource.getMetaVariable(allVars);
             LogicalExpressionReferenceTransform currentTransformer = null;
             if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) {
-                FeedDataSource fds = (FeedDataSource) dataSource;
-                if (fds.isChange()) {
+                IMutationDataSource mds = (IMutationDataSource) dataSource;
+                if (mds.isChange()) {
                     transformers = new ArrayList<>();
-                    transformers.add(new MetaKeyExpressionReferenceTransform(fds.getPkVars(allVars),
-                            fds.getKeyAccessExpression()));
+                    transformers.add(new MetaKeyExpressionReferenceTransform(mds.getPkVars(allVars),
+                            mds.getKeyAccessExpression()));
                 } else if (metaVar != null) {
                     transformers = new ArrayList<>();
                     transformers.add(new MetaKeyToFieldAccessTransform(metaVar));
@@ -98,7 +98,7 @@ public class MetaFunctionToMetaVariableRule implements IAlgebraicRewriteRule {
             }
             if (!dataSource.hasMeta() && transformers == null) {
                 return inputTransfomer;
-            };
+            }
             if (metaVar != null) {
                 currentTransformer = new LogicalExpressionReferenceTransform(dataVar, metaVar);
             }
@@ -286,7 +286,7 @@ class MetaKeyToFieldAccessTransform implements ILogicalExpressionReferenceTransf
                 throw new AlgebricksException("Unsupported field name type " + fieldNameType.getTypeTag());
         }
         IFunctionInfo finfoAccess = FunctionUtil.getFunctionInfo(functionIdentifier);
-        ArrayList<Mutable<ILogicalExpression>> argExprs = new ArrayList<Mutable<ILogicalExpression>>(2);
+        ArrayList<Mutable<ILogicalExpression>> argExprs = new ArrayList<>(2);
         argExprs.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
         argExprs.add(new MutableObject<>(fieldNameExpression));
         exprRef.setValue(new ScalarFunctionCallExpression(finfoAccess, argExprs));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index bb382f0..17dec7c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -18,11 +18,10 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import static org.apache.asterix.om.util.ConstantExpressionUtil.getStringArgument;
-
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -45,6 +44,7 @@ import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
 import org.apache.asterix.translator.util.PlanTranslationUtil;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -68,6 +68,11 @@ import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependenc
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
+    private final IAlgebraExtensionManager algebraExtensionManager;
+
+    public UnnestToDataScanRule(IAlgebraExtensionManager algebraExtensionManager) {
+        this.algebraExtensionManager = algebraExtensionManager;
+    }
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -150,19 +155,17 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, variables, recordType,
                         metaRecordType, dataset, context);
                 return true;
-            }
-
-            if (fid.equals(AsterixBuiltinFunctions.FEED_COLLECT)) {
+            } else if (fid.equals(AsterixBuiltinFunctions.FEED_COLLECT)) {
                 if (unnest.getPositionalVariable() != null) {
                     throw new AlgebricksException("No positional variables are allowed over feeds.");
                 }
 
-                String dataverse = getStringArgument(f, 0);
-                String sourceFeedName = getStringArgument(f, 1);
-                String getTargetFeed = getStringArgument(f, 2);
-                String subscriptionLocation = getStringArgument(f, 3);
-                String targetDataset = getStringArgument(f, 4);
-                String outputType = getStringArgument(f, 5);
+                String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
+                String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+                String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+                String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
+                String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+                String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
 
                 AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
 
@@ -190,10 +193,7 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 }
                 // Does it produce pk?
                 if (ds.isChange()) {
-                    int numOfPKs = ds.getPkTypes().size();
-                    for (int i = 0; i < numOfPKs; i++) {
-                        feedDataScanOutputVariables.addAll(pkVars);
-                    }
+                    feedDataScanOutputVariables.addAll(pkVars);
                 }
 
                 DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
@@ -202,8 +202,9 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 opRef.setValue(scan);
                 context.computeAndSetTypeEnvironmentForOperator(scan);
                 return true;
+            } else if (algebraExtensionManager != null) {
+                return algebraExtensionManager.unnestToDataScan(opRef, context, unnest, unnestExpr, f);
             }
-
         }
 
         return false;
@@ -297,6 +298,6 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
             dataverseName = datasetNameComponents[0];
             datasetName = datasetNameComponents[1];
         }
-        return new Pair<String, String>(dataverseName, datasetName);
+        return new Pair<>(dataverseName, datasetName);
     }
 }
\ No newline at end of file


Mime
View raw message