asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [8/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:54 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 978c2eb..f111c54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,10 +42,10 @@ import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.app.external.FeedJoint;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.result.ResultUtil;
@@ -60,31 +60,18 @@ import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeed.FeedType;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
-import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.IReturningStatement;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
 import org.apache.asterix.lang.common.base.IStatementRewriter;
@@ -97,8 +84,6 @@ import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -121,11 +106,12 @@ import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement;
 import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.TypeDecl;
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -141,14 +127,13 @@ import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.metadata.feeds.FeedOperations;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
@@ -165,11 +150,9 @@ import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor;
@@ -177,13 +160,14 @@ import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
+import org.apache.asterix.utils.FeedOperations;
 import org.apache.asterix.utils.FlushDatasetUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -221,8 +205,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider,
-            IStorageComponentProvider componentProvider, ExecutorService executorService) {
+    public QueryTranslator(List<Statement> statements, SessionConfig conf,
+            ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
+            ExecutorService executorService) {
         this.statements = statements;
         this.sessionConfig = conf;
         this.componentProvider = componentProvider;
@@ -349,8 +334,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, false);
                         break;
-                    case Statement.Kind.CREATE_PRIMARY_FEED:
-                    case Statement.Kind.CREATE_SECONDARY_FEED:
+                    case Statement.Kind.CREATE_FEED:
                         handleCreateFeedStatement(metadataProvider, stmt);
                         break;
                     case Statement.Kind.DROP_FEED:
@@ -360,13 +344,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         handleDropFeedPolicyStatement(metadataProvider, stmt);
                         break;
                     case Statement.Kind.CONNECT_FEED:
-                        handleConnectFeedStatement(metadataProvider, stmt, hcc);
+                        handleConnectFeedStatement(metadataProvider, stmt);
                         break;
                     case Statement.Kind.DISCONNECT_FEED:
-                        handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
+                        handleDisconnectFeedStatement(metadataProvider, stmt);
                         break;
-                    case Statement.Kind.SUBSCRIBE_FEED:
-                        handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+                    case Statement.Kind.START_FEED:
+                        handleStartFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case Statement.Kind.STOP_FEED:
+                        handleStopFeedStatement(metadataProvider, stmt);
                         break;
                     case Statement.Kind.CREATE_FEED_POLICY:
                         handleCreateFeedPolicyStatement(metadataProvider, stmt);
@@ -702,12 +689,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName)
-            throws CompilationException {
+    protected void validateIfResourceIsActiveInFeed(Dataset dataset) throws CompilationException {
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+            if (listener.isEntityUsingDataset(dataset)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -715,8 +701,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             }
         }
         if (builder != null) {
-            throw new CompilationException("Dataset " + dataverseName + "." + datasetName + " is currently being "
-                    + "fed into by the following active entities.\n" + builder.toString());
+            throw new CompilationException("Dataset " + dataset.getDataverseName() + "." + dataset.getDatasetName()
+                    + " is currently being " + "fed into by the following active entities.\n" + builder.toString());
         }
     }
 
@@ -907,7 +893,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
+                validateIfResourceIsActiveInFeed(ds);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -1204,11 +1190,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 EntityId activeEntityId = listener.getEntityId();
                 if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
                         && activeEntityId.getDataverse().equals(dataverseName)) {
-                    FeedEventsListener feedEventListener = (FeedEventsListener) listener;
-                    FeedConnectionId[] connections = feedEventListener.getConnections();
-                    for (FeedConnectionId conn : connections) {
-                        disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc);
-                    }
+                    stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())),
+                            metadataProvider);
                     // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE
                             .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
@@ -1316,20 +1299,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn,
-            MetadataProvider metadataProvider, IHyracksClientConnection hcc) {
-        DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId,
-                new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName()));
+    protected void stopFeedBeforeDelete(Pair<Identifier, Identifier> feedNameComp, MetadataProvider metadataProvider) {
+        StopFeedStatement disStmt = new StopFeedStatement(feedNameComp);
         try {
-            handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+            handleStopFeedStatement(metadataProvider, disStmt);
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset "
-                        + conn.getDatasetName());
+                LOGGER.info("Stopped feed " + feedNameComp.second.getValue());
             }
         } catch (Exception exception) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset "
-                        + conn.getDatasetName() + ". Encountered exception " + exception);
+                LOGGER.warning("Unable to stop feed " + feedNameComp.second.getValue() + exception);
             }
         }
     }
@@ -1407,7 +1386,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(dataverseName, datasetName) && listener.isEntityActive()) {
+                if (listener.isEntityUsingDataset(ds)) {
                     throw new CompilationException(
                             "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
@@ -1523,7 +1502,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(ds)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1991,22 +1970,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     throw new AlgebricksException("A feed with this name " + feedName + " already exists.");
                 }
             }
-
-            switch (stmt.getKind()) {
-                case Statement.Kind.CREATE_PRIMARY_FEED:
-                    CreatePrimaryFeedStatement cpfs = (CreatePrimaryFeedStatement) stmt;
-                    String adaptorName = cpfs.getAdaptorName();
-                    feed = new Feed(dataverseName, feedName, cfs.getAppliedFunction(), FeedType.PRIMARY, feedName,
-                            adaptorName, cpfs.getAdaptorConfiguration());
-                    break;
-                case Statement.Kind.CREATE_SECONDARY_FEED:
-                    CreateSecondaryFeedStatement csfs = (CreateSecondaryFeedStatement) stmt;
-                    feed = new Feed(dataverseName, feedName, csfs.getAppliedFunction(), FeedType.SECONDARY,
-                            csfs.getSourceFeedName(), null, null);
-                    break;
-                default:
-                    throw new IllegalStateException();
-            }
+            String adaptorName = cfs.getAdaptorName();
+            feed = new Feed(dataverseName, feedName, adaptorName, cfs.getAdaptorConfiguration());
             FeedMetadataUtil.validateFeed(feed, mdTxnCtx, metadataProvider.getLibraryManager());
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2103,12 +2068,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             FeedEventsListener listener =
                     (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId);
             if (listener != null) {
-                StringBuilder builder = new StringBuilder();
-                for (FeedConnectionId connectionId : listener.getConnections()) {
-                    builder.append(connectionId.getDatasetName() + "\n");
-                }
                 throw new AlgebricksException("Feed " + feedId
-                        + " is currently active and connected to the following dataset(s) \n" + builder.toString());
+                        + " is currently active and connected to the following dataset(s) \n" + listener.toString());
             } else {
                 JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(
                         MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
@@ -2156,305 +2117,159 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleConnectFeedStatement(MetadataProvider metadataProvider, Statement stmt,
+    private void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
-        ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
-        String dataverseName = getActiveDataverse(cfs.getDataverseName());
-        String feedName = cfs.getFeedName();
-        String datasetName = cfs.getDatasetName().getValue();
-        boolean bActiveTxn = true;
+        StartFeedStatement sfs = (StartFeedStatement) stmt;
+        String dataverseName = getActiveDataverse(sfs.getDataverseName());
+        String feedName = sfs.getFeedName().getValue();
+        // Transcation handler
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        metadataProvider.disableBlockingOperator();
-        boolean subscriberRegistered = false;
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        FeedConnectionId feedConnId = null;
-        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName());
+        // Runtime handler
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+        // Feed & Feed Connections
+        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+                metadataProvider.getMetadataTxnContext());
+        List<FeedConnection> feedConnections = MetadataManager.INSTANCE
+                .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
+        ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+        IStorageComponentProvider storageComponentProvider = new StorageComponentProvider();
+        DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory();
         FeedEventsListener listener =
                 (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
-        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
-                dataverseName + "." + feedName);
+        if (listener != null) {
+            throw new AlgebricksException("Feed " + feedName + " is started already.");
+        }
+        // Start
         try {
-            metadataProvider.setWriteTransaction(true);
-            CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(),
-                    cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
-            FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
-                    metadataProvider.getMetadataTxnContext());
-            Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
-                    metadataProvider.getMetadataTxnContext());
-            feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
-            if (listener != null) {
-                subscriberRegistered = listener.isFeedConnectionActive(feedConnId, eventSubscriber);
-            }
-            if (subscriberRegistered) {
-                throw new CompilationException("Feed " + cfs.getFeedName() + " is already connected to dataset "
-                        + cfs.getDatasetName().getValue());
-            }
-            FeedPolicyEntity feedPolicy =
-                    FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
-            // All Metadata checks have passed. Feed connect request is valid. //
-            if (listener == null) {
-                listener = new FeedEventsListener(entityId);
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
-            }
-            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
-            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple =
-                    getFeedConnectionRequest(dataverseName, feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
-            FeedConnectionRequest connectionRequest = triple.first;
-            boolean createFeedIntakeJob = triple.second;
-            listener.registerFeedEventSubscriber(eventSubscriber);
-            subscriberRegistered = true;
-            if (createFeedIntakeJob) {
-                EntityId feedId = connectionRequest.getFeedJointKey().getFeedId();
-                Feed primaryFeed =
-                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
-                Pair<JobSpecification, IAdapterFactory> pair =
-                        FeedOperations.buildFeedIntakeJobSpec(primaryFeed, metadataProvider, policyAccessor);
-                // adapter configuration are valid at this stage
-                // register the feed joints (these are auto-de-registered)
-                int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
-                for (IFeedJoint fj : triple.third) {
-                    listener.registerFeedJoint(fj, numOfPrividers);
-                }
-                FeedIntakeInfo activeJob = new FeedIntakeInfo(null, ActivityState.ACTIVE, feed.getFeedId(),
-                        triple.third.get(0), pair.first);
-                pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
-                JobUtils.runJob(hcc, pair.first, false);
-                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
-            } else {
-                for (IFeedJoint fj : triple.third) {
-                    listener.registerFeedJoint(fj, 0);
-                }
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
-            if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
-            }
+            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName,
+                    feedConnections);
+            // Prepare policy
+            List<IDataset> datasets = new ArrayList<>();
+            for (FeedConnection connection : feedConnections) {
+                datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, connection.getDataverseName(),
+                        connection.getDatasetName()));
+            }
+
+            org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
+                    FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections,
+                            compilationProvider, storageComponentProvider, qtFactory, hcc);
+
+            JobSpecification feedJob = jobInfo.getLeft();
+            listener = new FeedEventsListener(entityId, datasets, jobInfo.getRight().getLocations());
+            ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
+            feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
+            JobUtils.runJob(hcc, feedJob,
+                    Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
+            eventSubscriber.sync();
+            LOGGER.log(Level.INFO, "Submitted");
         } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
+            abort(e, e, mdTxnCtx);
+            if (listener != null) {
+                ActiveJobNotificationHandler.INSTANCE.unregisterListener(listener);
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                    dataverseName + "." + feedName);
-            if (subscriberRegistered) {
-                listener.deregisterFeedEventSubscriber(eventSubscriber);
-            }
+            MetadataLockManager.INSTANCE.startFeedEnd(dataverseName, dataverseName + "." + feedName, feedConnections);
         }
     }
 
-    /**
-     * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean
-     * flag indicating if feed intake job needs to be started (source primary feed not found to be active).
-     *
-     * @param dataverse
-     * @param feed
-     * @param dataset
-     * @param feedPolicy
-     * @param mdTxnCtx
-     * @return
-     * @throws CompilationException
-     */
-    protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
-            Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws CompilationException {
-        IFeedJoint sourceFeedJoint;
-        FeedConnectionRequest request;
-        List<String> functionsToApply = new ArrayList<>();
-        boolean needIntakeJob = false;
-        List<IFeedJoint> jointsToRegister = new ArrayList<>();
-        FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), dataset);
-        FeedRuntimeType connectionLocation;
-        FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx);
-        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverse, feed.getFeedName());
+    private void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+        StopFeedStatement sfst = (StopFeedStatement) stmt;
+        String dataverseName = getActiveDataverse(sfst.getDataverseName());
+        String feedName = sfst.getFeedName().getValue();
+        EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName);
+        // Obtain runtime info from ActiveListener
         FeedEventsListener listener =
-                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId);
         if (listener == null) {
-            throw new CompilationException("Feed Listener is not registered");
+            throw new AlgebricksException("Feed " + feedName + " is not started.");
         }
-
-        boolean isFeedJointAvailable = listener.isFeedJointAvailable(feedJointKey);
-        if (!isFeedJointAvailable) {
-            sourceFeedJoint = listener.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
-                connectionLocation = FeedRuntimeType.INTAKE;
-                EntityId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
-                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getEntityName());
-                FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<>());
-                sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
-                        FeedJointType.INTAKE, connectionId);
-                jointsToRegister.add(sourceFeedJoint);
-                needIntakeJob = true;
-            } else {
-                connectionLocation = sourceFeedJoint.getConnectionLocation();
-            }
-
-            String[] functions = feedJointKey.getStringRep()
-                    .substring(sourceFeedJoint.getFeedJointKey().getStringRep().length()).trim().split(":");
-            for (String f : functions) {
-                if (f.trim().length() > 0) {
-                    functionsToApply.add(f);
-                }
-            }
-            // register the compute feed point that represents the final output from the collection of
-            // functions that will be applied.
-            if (!functionsToApply.isEmpty()) {
-                FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
-                IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
-                        FeedRuntimeType.COMPUTE, FeedJointType.COMPUTE, connectionId);
-                jointsToRegister.add(computeFeedJoint);
-            }
-        } else {
-            sourceFeedJoint = listener.getFeedJoint(feedJointKey);
-            connectionLocation = sourceFeedJoint.getConnectionLocation();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Feed joint " + sourceFeedJoint + " is available! need not apply any further computation");
-            }
-        }
-
-        request = new FeedConnectionRequest(sourceFeedJoint.getFeedJointKey(), connectionLocation, functionsToApply,
-                dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId());
-
-        sourceFeedJoint.addConnectionRequest(request);
-        return new Triple<>(request, needIntakeJob, jointsToRegister);
-    }
-
-    /*
-     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are
-     * available at this feed joint.
-     */
-    protected FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException {
-        Feed sourceFeed = feed;
-        List<String> appliedFunctions = new ArrayList<>();
-        while (sourceFeed.getFeedType().equals(IFeed.FeedType.SECONDARY)) {
-            if (sourceFeed.getAppliedFunction() != null) {
-                appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
-            }
-            Feed parentFeed =
-                    MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(), sourceFeed.getSourceFeedName());
-            sourceFeed = parentFeed;
-        }
-
-        if (sourceFeed.getAppliedFunction() != null) {
-            appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
+        IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STOPPED);
+        // Transaction
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        MetadataLockManager.INSTANCE.StopFeedBegin(dataverseName, feedName);
+        try {
+            // validate
+            FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx);
+            // Construct ActiveMessage
+            for (int i = 0; i < listener.getSources().length; i++) {
+                String intakeLocation = listener.getSources()[i];
+                FeedOperations.SendStopMessageToNode(feedId, intakeLocation, i);
+            }
+            eventSubscriber.sync();
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.StopFeedEnd(dataverseName, feedName);
         }
-
-        return new FeedJointKey(sourceFeed.getFeedId(), appliedFunctions);
     }
 
-    protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
+    private void handleConnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+        FeedConnection fc;
+        ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
+        String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
+        String policyName = cfs.getPolicy();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
-        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
-        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
-        FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        FeedEventsListener listener =
-                (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
-        if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) {
-            throw new CompilationException("Feed " + feed.getFeedId().getEntityName()
-                    + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!");
-        }
-        listener.registerFeedEventSubscriber(eventSubscriber);
-        MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
-                dataverseName + "." + cfs.getFeedName());
+        // validation
+        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+                metadataProvider.getMetadataTxnContext());
+        ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(),
+                ExternalDataConstants.KEY_TYPE_NAME);
+        List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions();
+        // Transaction handling
+        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+                dataverseName + "." + feedName);
         try {
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
-                    dataverseName, cfs.getDatasetName().getValue());
-            if (dataset == null) {
-                throw new CompilationException(
-                        "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName);
-            }
-            Pair<JobSpecification, Boolean> specDisconnectType =
-                    FeedOperations.buildDisconnectFeedJobSpec(connectionId);
-            JobSpecification jobSpec = specDisconnectType.first;
+            fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
+                    feedName, datasetName);
+            if (fc != null) {
+                throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName);
+            }
+            fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName,
+                    outputType.toString());
+            MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-            JobUtils.runJob(hcc, jobSpec, true);
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
+            abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                    dataverseName + "." + cfs.getFeedName());
+            MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + feedName);
         }
     }
 
-    protected void handleSubscribeFeedStatement(MetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Subscriber Feed Statement :" + stmt);
-        }
-
+    protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+        DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
+        String dataverseName = getActiveDataverse(cfs.getDataverseName());
+        String datasetName = cfs.getDatasetName().getValue();
+        String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        metadataProvider.setWriteTransaction(true);
-        metadataProvider.disableBlockingOperator();
-        SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
-        bfs.initialize(metadataProvider.getMetadataTxnContext());
-
-        CompiledSubscribeFeedStatement csfs =
-                new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), bfs.getVarCounter());
-        metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
-        metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy());
-        metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
-                StringUtils.join(bfs.getLocations(), ','));
-
-        JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, bfs.getQuery(), csfs);
-        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(),
-                bfs.getSubscriptionRequest().getTargetDataset());
-        String dataverse = feedConnectionId.getFeedId().getDataverse();
-        String dataset = feedConnectionId.getDatasetName();
-        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset,
-                dataverse + "." + feedConnectionId.getFeedId().getEntityName());
+        MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+                dataverseName + "." + cfs.getFeedName());
         try {
-            JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(jobSpec, feedConnectionId,
-                    bfs.getSubscriptionRequest().getPolicyParameters());
-            FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, bfs.getPolicy());
-            if (policy == null) {
-                policy = BuiltinFeedPolicies.getFeedPolicy(bfs.getPolicy());
-                if (policy == null) {
-                    throw new AlgebricksException("Unknown feed policy:" + bfs.getPolicy());
-                }
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            if (jobSpec != null) {
-                FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
-                        .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
-                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(
-                        bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE,
-                        new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
-                        listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties());
-                alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
-                JobUtils.runJob(hcc, alteredJobSpec, false);
-            }
-
+            FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+            FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
+            FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
+                    dataverseName, feedName, datasetName);
+            if (fc == null) {
+                throw new CompilationException("Feed " + feedName + " is currently not connected to "
+                        + cfs.getDatasetName().getValue() + ". Invalid operation!");
+            }
+            MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
         } catch (Exception e) {
-            LOGGER.log(Level.WARNING, e.getMessage(), e);
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
+            abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset,
-                    dataverse + "." + feedConnectionId.getFeedId().getEntityName());
+            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + cfs.getFeedName());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
new file mode 100644
index 0000000..f8b5496
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
+import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.feeds.LocationConstraint;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.translator.CompiledStatements;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
+
+/**
+ * Provides helper method(s) for creating JobSpec for operations on a feed.
+ */
+public class FeedOperations {
+
+    private FeedOperations() {
+    }
+
+    private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
+            MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        IAdapterFactory adapterFactory;
+        IOperatorDescriptor feedIngestor;
+        AlgebricksPartitionConstraint ingesterPc;
+        Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t =
+                metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor);
+        feedIngestor = t.first;
+        ingesterPc = t.second;
+        adapterFactory = t.third;
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
+        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
+        spec.addRoot(nullSink);
+        return Pair.of(spec, adapterFactory);
+    }
+
+    public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws AsterixException {
+        JobSpecification spec = RuntimeUtils.createJobSpecification();
+        AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations();
+        Set<String> nodes = new TreeSet<>();
+        for (String node : allCluster.getLocations()) {
+            nodes.add(node);
+        }
+        AlgebricksAbsolutePartitionConstraint locations =
+                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
+        FileSplit[] feedLogFileSplits =
+                FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
+        org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+                StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, spC.second);
+        spec.addRoot(frod);
+        return spec;
+    }
+
+    private static JobSpecification getConnectionJob(SessionConfig sessionConfig, MetadataProvider metadataProvider,
+            FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider,
+            IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
+            IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException {
+        DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedConnection.getDataverseName()));
+        FeedConnectionRequest fcr =
+                new FeedConnectionRequest(FeedRuntimeType.INTAKE, feedConnection.getAppliedFunctions(),
+                        feedConnection.getDatasetName(), feedConnection.getPolicyName(), feedConnection.getFeedId());
+        SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, fcr);
+        subscribeStmt.initialize(metadataProvider.getMetadataTxnContext());
+        List<Statement> statements = new ArrayList<>();
+        statements.add(dataverseDecl);
+        statements.add(subscribeStmt);
+        IStatementExecutor translator =
+                qtFactory.create(statements, sessionConfig, compilationProvider, storageComponentProvider);
+        // configure the metadata provider
+        metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
+        metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy());
+        metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
+                StringUtils.join(subscribeStmt.getLocations(), ','));
+
+        CompiledStatements.CompiledSubscribeFeedStatement csfs = new CompiledStatements.CompiledSubscribeFeedStatement(
+                subscribeStmt.getSubscriptionRequest(), subscribeStmt.getVarCounter());
+        return translator.rewriteCompileQuery(hcc, metadataProvider, subscribeStmt.getQuery(), csfs);
+    }
+
+    private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,
+            JobSpecification intakeJob, List<JobSpecification> jobsList, List<FeedConnection> feedConnections,
+            String[] intakeLocations) throws AlgebricksException, HyracksDataException {
+        JobSpecification jobSpec = new JobSpecification(intakeJob.getFrameSize());
+
+        // copy ingestor
+        FeedIntakeOperatorDescriptor firstOp =
+                (FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0));
+        FeedIntakeOperatorDescriptor ingestionOp;
+        if (firstOp.getAdaptorFactory() == null) {
+            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryName(),
+                    firstOp.getAdaptorFactoryClassName(), firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
+                    firstOp.getOutputRecordDescriptors()[0]);
+        } else {
+            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(),
+                    firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
+                    firstOp.getOutputRecordDescriptors()[0]);
+        }
+        // create replicator
+        ReplicateOperatorDescriptor replicateOp =
+                new ReplicateOperatorDescriptor(jobSpec, ingestionOp.getOutputRecordDescriptors()[0], jobsList.size());
+        jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), ingestionOp, 0, replicateOp, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, ingestionOp, intakeLocations);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, replicateOp, intakeLocations);
+        // Loop over the jobs to copy operators and connections
+        Map<OperatorDescriptorId, OperatorDescriptorId> operatorIdMapping = new HashMap<>();
+        Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
+        Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
+        Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
+        List<JobId> jobIds = new ArrayList<>();
+
+        for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
+            FeedConnection curFeedConnection = feedConnections.get(iter1);
+            JobSpecification subJob = jobsList.get(iter1);
+            operatorIdMapping.clear();
+            Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
+
+            FeedPolicyEntity feedPolicyEntity =
+                    FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
+                            curFeedConnection.getPolicyName(), metadataProvider.getMetadataTxnContext());
+
+            for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) {
+                IOperatorDescriptor opDesc = entry.getValue();
+                OperatorDescriptorId oldId = opDesc.getOperatorId();
+                OperatorDescriptorId opId;
+                if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
+                        && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
+                    String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+                    FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                            new FeedConnectionId(ingestionOp.getEntityId(),
+                                    feedConnections.get(iter1).getDatasetName()),
+                            opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, false, operandId);
+                    opId = metaOp.getOperatorId();
+                    opDesc.setOperatorId(opId);
+                } else {
+                    if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
+                        AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc;
+                        for (IPushRuntimeFactory runtimeFactory : algOp.getPipeline().getRuntimeFactories()) {
+                            if (runtimeFactory instanceof StreamSelectRuntimeFactory) {
+                                ((StreamSelectRuntimeFactory) runtimeFactory).retainMissing(true, 0);
+                            }
+                        }
+                    }
+                    opId = jobSpec.createOperatorDescriptorId(opDesc);
+                }
+                operatorIdMapping.put(oldId, opId);
+            }
+
+            // copy connectors
+            connectorIdMapping.clear();
+            for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : subJob.getConnectorMap().entrySet()) {
+                IConnectorDescriptor connDesc = entry.getValue();
+                ConnectorDescriptorId newConnId;
+                if (entry.getKey().getId() == 0) {
+                    continue;
+                }
+                if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
+                    MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
+                    connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
+                            m2nConn.getTuplePartitionComputerFactory());
+                    newConnId = connDesc.getConnectorId();
+                } else {
+                    newConnId = jobSpec.createConnectorDescriptor(connDesc);
+                }
+                connectorIdMapping.put(entry.getKey(), newConnId);
+            }
+
+            // make connections between operators
+            for (Entry<ConnectorDescriptorId,
+                    Pair<Pair<IOperatorDescriptor, Integer>,Pair<IOperatorDescriptor, Integer>>> entry :
+                          subJob.getConnectorOperatorMap().entrySet()) {
+                ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
+                IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
+                Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
+                Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
+                IOperatorDescriptor leftOpDesc = jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId());
+                IOperatorDescriptor rightOpDesc = jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId());
+                if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) {
+                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc,
+                            leftOp.getRight());
+                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), leftOpDesc, leftOp.getRight(),
+                            rightOpDesc, rightOp.getRight());
+                } else {
+                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
+                }
+            }
+
+            // prepare for setting partition constraints
+            operatorLocations.clear();
+            operatorCounts.clear();
+
+            for (Constraint constraint : subJob.getUserConstraints()) {
+                LValueConstraintExpression lexpr = constraint.getLValue();
+                ConstraintExpression cexpr = constraint.getRValue();
+                OperatorDescriptorId opId;
+                switch (lexpr.getTag()) {
+                    case PARTITION_COUNT:
+                        opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                        if (opId.getId() == 0) {
+                            continue;
+                        }
+                        operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue());
+                        break;
+                    case PARTITION_LOCATION:
+                        opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                        if (opId.getId() == 0) {
+                            continue;
+                        }
+                        IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
+                        List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
+                        if (locations == null) {
+                            locations = new ArrayList<>();
+                            operatorLocations.put(opDesc.getOperatorId(), locations);
+                        }
+                        String location = (String) ((ConstantExpression) cexpr).getValue();
+                        LocationConstraint lc =
+                                new LocationConstraint(location, ((PartitionLocationExpression) lexpr).getPartition());
+                        locations.add(lc);
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            // set absolute location constraints
+            for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
+                IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey());
+                // why do we need to sort?
+                Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint o2) -> {
+                    return o1.partition - o2.partition;
+                });
+                String[] locations = new String[entry.getValue().size()];
+                for (int j = 0; j < locations.length; ++j) {
+                    locations[j] = entry.getValue().get(j).location;
+                }
+                PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, opDesc, locations);
+            }
+
+            // set count constraints
+            for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
+                IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey());
+                if (!operatorLocations.keySet().contains(entry.getKey())) {
+                    PartitionConstraintHelper.addPartitionCountConstraint(jobSpec, opDesc, entry.getValue());
+                }
+            }
+            // roots
+            for (OperatorDescriptorId root : subJob.getRoots()) {
+                jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
+            }
+            jobIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getJobId());
+        }
+
+        // jobEventListenerFactory
+        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(jobIds, true));
+        // useConnectorSchedulingPolicy
+        jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
+        // connectorAssignmentPolicy
+        jobSpec.setConnectorPolicyAssignmentPolicy(jobsList.get(0).getConnectorPolicyAssignmentPolicy());
+        return jobSpec;
+    }
+
+    public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
+            SessionConfig sessionConfig, MetadataProvider metadataProvider, Feed feed,
+            List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider,
+            IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
+            IHyracksClientConnection hcc) throws Exception {
+        FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
+        // TODO: Change the default Datasource to use all possible partitions
+        Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
+        //TODO: Add feed policy accessor
+        List<JobSpecification> jobsList = new ArrayList<>();
+        // Construct the ingestion Job
+        JobSpecification intakeJob = intakeInfo.getLeft();
+        IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
+        String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
+        // Add connection job
+        for (FeedConnection feedConnection : feedConnections) {
+            JobSpecification connectionJob = getConnectionJob(sessionConfig, metadataProvider, feedConnection,
+                    ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc);
+            jobsList.add(connectionJob);
+        }
+        return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections,
+                ingestionLocations), intakeInfo.getRight().getPartitionConstraint());
+    }
+
+    public static void SendStopMessageToNode(EntityId feedId, String intakeNodeLocation, Integer partition)
+            throws Exception {
+        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "SRC",
+                new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
+        SendActiveMessage(stopFeedMessage, intakeNodeLocation);
+    }
+
+    private static void SendActiveMessage(ActiveManagerMessage activeManagerMessage, String nodeId) throws Exception {
+        ICCMessageBroker messageBroker =
+                (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+        messageBroker.sendApplicationMessageToNC(activeManagerMessage, nodeId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm
index 390a955..cdaaf1b 100644
--- a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm
@@ -1,13 +1,14 @@
-{ "DataverseName": "Metadata", "DatasetName": "CompactionPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 13, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Dataset", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 2, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "DatasourceAdapter", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 8, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Datatype", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 3, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Dataverse", "DatatypeDataverseName": "Metadata", "DatatypeName": "DataverseRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ] ], "PrimaryKey": [ [ "DataverseName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 1, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "ExternalFile", "DatatypeDataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 14, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Feed", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 10, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "FeedPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 12, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Function", "DatatypeDataverseName": "Metadata", "DatatypeName": "FunctionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 7, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Index", "DatatypeDataverseName": "Metadata", "DatatypeName": "IndexRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 4, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Library", "DatatypeDataverseName": "Metadata", "DatatypeName": "LibraryRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 9, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Node", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "NodeName" ] ], "PrimaryKey": [ [ "NodeName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 5, "PendingOp": 0 }
-{ "DataverseName": "Metadata", "DatasetName": "Nodegroup", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "GroupName" ] ], "PrimaryKey": [ [ "GroupName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 6, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "CompactionPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 13, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Dataset", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 2, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "DatasourceAdapter", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 8, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Datatype", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 3, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Dataverse", "DatatypeDataverseName": "Metadata", "DatatypeName": "DataverseRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ] ], "PrimaryKey": [ [ "DataverseName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 1, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "ExternalFile", "DatatypeDataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 14, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Feed", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 10, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "FeedConnection", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 11, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "FeedPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 12, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Function", "DatatypeDataverseName": "Metadata", "DatatypeName": "FunctionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 7, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Index", "DatatypeDataverseName": "Metadata", "DatatypeName": "IndexRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 4, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Library", "DatatypeDataverseName": "Metadata", "DatatypeName": "LibraryRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 9, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Node", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "NodeName" ] ], "PrimaryKey": [ [ "NodeName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 5, "PendingOp": 0 }
+{ "DataverseName": "Metadata", "DatasetName": "Nodegroup", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "GroupName" ] ], "PrimaryKey": [ [ "GroupName" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 6, "PendingOp": 0 }


Mime
View raw message