asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/9] asterixdb git commit: Feed Connection Refactoring
Date Sun, 19 Feb 2017 07:14:49 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 5f478c3..ab8c8f7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -28,7 +28,6 @@ import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -45,6 +44,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri
  */
 public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
+    private static final String FEED_EXTENSION_NAME = "Feed";
+
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
@@ -53,29 +54,23 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     private final EntityId feedId;
 
     private final FeedPolicyAccessor policyAccessor;
-
+    private final ARecordType adapterOutputType;
     /** The adaptor factory that is used to create an instance of the feed adaptor **/
     private IAdapterFactory adaptorFactory;
-
     /** The library that contains the adapter in use. **/
     private String adaptorLibraryName;
-
     /**
      * The adapter factory class that is used to create an instance of the feed adapter.
      * This value is used only in the case of external adapters.
      **/
     private String adaptorFactoryClassName;
-
     /** The configuration parameters associated with the adapter. **/
     private Map<String, String> adaptorConfiguration;
 
-    private final ARecordType adapterOutputType;
-
     public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
             ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
         super(spec, 0, 1);
-        this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
-                primaryFeed.getFeedName());
+        this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
         this.adaptorFactory = adapterFactory;
         this.adapterOutputType = adapterOutputType;
         this.policyAccessor = policyAccessor;
@@ -86,8 +81,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
             String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor,
             RecordDescriptor rDesc) {
         super(spec, 0, 1);
-        this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
-                primaryFeed.getFeedName());
+        this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
         this.adaptorFactoryClassName = adapterFactoryClassName;
         this.adaptorLibraryName = adapterLibraryName;
         this.adaptorConfiguration = primaryFeed.getAdapterConfiguration();
@@ -108,8 +102,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
 
     private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
         IAdapterFactory adapterFactory;
-        IAppRuntimeContext runtimeCtx =
-                (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
         ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
         ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName);
         if (classLoader != null) {
@@ -130,8 +124,32 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
         return adapterFactory;
     }
 
-    public EntityId getFeedId() {
+    public EntityId getEntityId() {
         return feedId;
     }
 
+    public IAdapterFactory getAdaptorFactory() {
+        return this.adaptorFactory;
+    }
+
+    public void setAdaptorFactory(IAdapterFactory factory) {
+        this.adaptorFactory = factory;
+    }
+
+    public ARecordType getAdapterOutputType() {
+        return this.adapterOutputType;
+    }
+
+    public FeedPolicyAccessor getPolicyAccessor() {
+        return this.policyAccessor;
+    }
+
+    public String getAdaptorLibraryName() {
+        return this.adaptorLibraryName;
+    }
+
+    public String getAdaptorFactoryClassName() {
+        return this.adaptorFactoryClassName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index f58e9e5..99fff19 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,84 +18,58 @@
  */
 package org.apache.asterix.external.operators;
 
-import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
 /**
  * The runtime for @see{FeedIntakeOperationDescriptor}.
  * Provides the core functionality to set up the artifacts for ingestion of a feed.
  * The artifacts are lazily activated when a feed receives a subscription request.
  */
-public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
 
-    private final EntityId feedId;
     private final int partition;
-    private final IHyracksTaskContext ctx;
     private final IAdapterFactory adapterFactory;
     private final FeedIntakeOperatorDescriptor opDesc;
+    private volatile AdapterRuntimeManager adapterRuntimeManager;
 
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
             int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
             FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
+        super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         this.opDesc = feedIntakeOperatorDescriptor;
         this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
-        this.ctx = ctx;
-        this.feedId = feedId;
         this.partition = partition;
         this.adapterFactory = adapterFactory;
     }
 
     @Override
-    public void initialize() throws HyracksDataException {
-        ActiveManager feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getActiveManager();
-        AdapterRuntimeManager adapterRuntimeManager = null;
-        DistributeFeedFrameWriter frameDistributor = null;
-        IngestionRuntime ingestionRuntime = null;
-        boolean open = false;
+    protected void start() throws HyracksDataException, InterruptedException {
+        writer.open();
         try {
             Thread.currentThread().setName("Intake Thread");
-            // create the adapter
             FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
-            // create the distributor
-            frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
-            // create adapter runtime manager
-            adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition);
-            // create and register the runtime
-            ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
-            ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);
-            feedManager.registerRuntime(ingestionRuntime);
-            // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-            ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
-            // open the distributor
-            open = true;
-            frameDistributor.open();
-            // wait until ingestion is over
+            adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
+            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            adapterRuntimeManager.start();
             synchronized (adapterRuntimeManager) {
                 while (!adapterRuntimeManager.isDone()) {
                     adapterRuntimeManager.wait();
                 }
             }
-            // The ingestion is over. we need to remove the runtime from the manager
-            feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
-            // If there was a failure, we need to throw an exception
             if (adapterRuntimeManager.isFailed()) {
                 throw new RuntimeDataException(
                         ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
@@ -106,15 +80,16 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
              * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
              * The surviving intake partitions must continue to live and receive data from the external source.
              */
-            if (ingestionRuntime != null) {
-                ingestionRuntime.terminate();
-                feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
-            }
             throw new HyracksDataException(ie);
         } finally {
-            if (open) {
-                frameDistributor.close();
-            }
+                writer.close();
+        }
+    }
+
+    @Override
+    protected void abort() throws HyracksDataException, InterruptedException {
+        if (adapterRuntimeManager != null) {
+            adapterRuntimeManager.stop();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
deleted file mode 100644
index 61451b1..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.operators;
-
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * @deprecated
- *             Sends a control message to the registered message queue for feed specified by its feedId.
- *             For messaging, use IMessageBroker interfaces
- */
-@Deprecated
-public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final IActiveMessage feedMessage;
-
-    public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
-            IActiveMessage feedMessage) {
-        super(spec, 0, 1);
-        this.connectionId = connectionId;
-        this.feedMessage = feedMessage;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
deleted file mode 100644
index b273325..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.operators;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveMessage;
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.EndFeedMessage;
-import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
-import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * @deprecated
- *             Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
- *             a feed message to the local feed manager on the host node controller.
- *             For messages, use IMessageBroker interfaces
- * @see FeedMessageOperatorDescriptor
- *      IFeedMessage
- *      IFeedManager
- */
-@Deprecated
-public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final IActiveMessage message;
-    private final ActiveManager feedManager;
-    private final int partition;
-
-    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
-            IActiveMessage feedMessage, int partition) {
-        this.connectionId = connectionId;
-        this.message = feedMessage;
-        this.partition = partition;
-        IAppRuntimeContext runtimeCtx =
-                (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
-        this.feedManager = (ActiveManager) runtimeCtx.getActiveManager();
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        try {
-            writer.open();
-            switch (message.getMessageType()) {
-                case END:
-                    EndFeedMessage endFeedMessage = (EndFeedMessage) message;
-                    switch (endFeedMessage.getEndMessageType()) {
-                        case DISCONNECT_FEED:
-                            hanldeDisconnectFeedTypeMessage(endFeedMessage);
-                            break;
-                        case DISCONTINUE_SOURCE:
-                            handleDiscontinueFeedTypeMessage(endFeedMessage);
-                            break;
-                        default:
-                            break;
-                    }
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            writer.close();
-        }
-    }
-
-    private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
-        EntityId sourceFeedId = endFeedMessage.getSourceFeedId();
-        ActiveRuntimeId subscribableRuntimeId =
-                new ActiveRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE.toString(), partition);
-        ISubscribableRuntime feedRuntime = (ISubscribableRuntime) feedManager.getRuntime(subscribableRuntimeId);
-        AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
-        adapterRuntimeManager.stop();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
-        }
-    }
-
-    private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
-        }
-        ActiveRuntimeId runtimeId;
-        FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
-        if (endFeedMessage.isCompleteDisconnection()) {
-            // subscribableRuntimeType represents the location at which the feed connection receives
-            // data
-            FeedRuntimeType runtimeType;
-            switch (subscribableRuntimeType) {
-                case INTAKE:
-                    runtimeType = FeedRuntimeType.COLLECT;
-                    break;
-                case COMPUTE:
-                    runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
-                    break;
-                default:
-                    throw new RuntimeDataException(
-                            ErrorCode.OPERATORS_FEED_MSG_OPERATOR_NODE_PUSHABLE_INVALID_SUBSCRIBABLE_RUNTIME,
-                            subscribableRuntimeType);
-            }
-
-            runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType.toString(), partition);
-            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId);
-            if (feedRuntime != null) {
-                feedRuntime.getSourceRuntime().unsubscribe(feedRuntime);
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
-            }
-        } else {
-            // subscribaleRuntimeType represents the location for data hand-off in presence of
-            // subscribers
-            switch (subscribableRuntimeType) {
-                case INTAKE:
-                    // illegal state as data hand-off from one feed to another does not happen at
-                    // intake
-                    throw new RuntimeDataException(
-                            ErrorCode.OPERATORS_FEED_MSG_OPERATOR_NODE_PUSHABLE_INVALID_SUBSCRIBABLE_RUNTIME,
-                            subscribableRuntimeType);
-                case COMPUTE:
-                    // feed could be primary or secondary, doesn't matter
-                    ActiveRuntimeId feedSubscribableRuntimeId = new ActiveRuntimeId(connectionId.getFeedId(),
-                            FeedRuntimeType.COMPUTE.toString(), partition);
-                    ISubscribableRuntime feedRuntime =
-                            (ISubscribableRuntime) feedManager.getRuntime(feedSubscribableRuntimeId);
-                    runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(),
-                            FeedRuntimeType.COMPUTE_COLLECT.toString(), partition);
-                    CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId);
-                    feedRuntime.unsubscribe(feedCollectionRuntime);
-                    break;
-                default:
-                    break;
-            }
-
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Unsubscribed from feed :" + connectionId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ec0399..b794ee1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -44,6 +44,8 @@ import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FeedUtils {
 
+    public static final String FEED_EXTENSION_NAME = "Feed";
+
     public enum JobType {
         INTAKE,
         FEED_CONNECT

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 171d271..d407b8a 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -72,7 +73,7 @@ public class InputHandlerTest extends TestCase {
     private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
             FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
         FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
-        EntityId feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, DATAVERSE, FEED);
+        EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, DATAVERSE, FEED);
         FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
         ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
         return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
index 39a6272..40aec53 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -30,3 +30,5 @@ use dataverse externallibtest;
 set wait-for-completion-feed "true";
 
 connect feed TestTypedAdapterFeed to dataset TweetsTestAdapter;
+
+start feed TestTypedAdapterFeed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
index dbe3cfa..5f3d322 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013
@@ -48,8 +48,7 @@ create feed TweetFeed
 using localfs
 (("type-name"="TweetInputType"),
 ("path"="asterix_nc1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),
-("format"="adm"))
-apply function testlib#parseTweet;
+("format"="adm"));
 
 create dataset TweetsFeedIngest(TweetOutputType)
 primary key id;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
index d5a6f58..9642992 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013
@@ -29,4 +29,7 @@ use dataverse externallibtest;
 
 set wait-for-completion-feed "true";
 
-connect feed TweetFeed to dataset TweetsFeedIngest;
+connect feed TweetFeed to dataset TweetsFeedIngest
+apply function testlib#parseTweet;
+
+start feed TweetFeed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql
index e4e1b45..8879fa8 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql
@@ -19,8 +19,8 @@
 /*
  * Description  : Create a feed dataset that uses the feed simulator adapter.
                   The feed simulator simulates feed from a file in the local fs.
-                  Associate with the feed an external user-defined function. The UDF 
-                  finds topics in each tweet. A topic is identified by a #. 
+                  Associate with the feed an external user-defined function. The UDF
+                  finds topics in each tweet. A topic is identified by a #.
                   Begin ingestion and apply external user defined function
  * Expected Res : Success
  * Date         : 23rd Apr 2013

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql
index 58dea6b..1e05e37 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql
@@ -25,3 +25,5 @@
 
 use dataverse twitter;
 connect feed MessageFeed to dataset ds_tweet;
+
+start feed MessageFeed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql
index cf520ca..fe30266 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql
@@ -33,3 +33,4 @@ create feed TweetFeed using localfs
 );
 set wait-for-completion-feed "true";
 connect feed TweetFeed to dataset ds_tweet;
+start feed TweetFeed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql
index 860b8ed..8cb5e07 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql
@@ -24,4 +24,4 @@
  */
 use dataverse twitter;
 set wait-for-completion-feed "false";
-connect feed TweetFeed to dataset ds_tweet;
+start feed TweetFeed;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql b/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql
index 7faf013..d9b1230 100644
--- a/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql
+++ b/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql
@@ -24,4 +24,5 @@
 use dataverse KeyVerse;
 
 set wait-for-completion-feed "true";
-connect feed KVChangeStream to dataset KVStore;
\ No newline at end of file
+connect feed KVChangeStream to dataset KVStore;
+start feed KVChangeStream;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index c31da8b..2975e63 100644
--- a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++ b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-804
\ No newline at end of file
+788
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 1d0d962..4596054 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -32,6 +32,7 @@ import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -44,6 +45,7 @@ import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -54,14 +56,14 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
  */
 public class SubscribeFeedStatement implements Statement {
 
+    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+    private static final Integer INSERT_STATEMENT_POS = 3;
     private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
-    private final FeedConnectionRequest connectionRequest;
-    private Query query;
     private final int varCounter;
     private final String[] locations;
-
-    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+    private final FeedConnectionRequest connectionRequest;
     private final IParserFactory parserFactory = new AQLParserFactory();
+    private Query query;
 
     public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) {
         this.connectionRequest = subscriptionRequest;
@@ -71,7 +73,7 @@ public class SubscribeFeedStatement implements Statement {
 
     public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         this.query = new Query(false);
-        EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+        EntityId sourceFeedId = connectionRequest.getReceivingFeedId();
         Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
                 connectionRequest.getReceivingFeedId().getDataverse(),
                 connectionRequest.getReceivingFeedId().getEntityName());
@@ -80,18 +82,6 @@ public class SubscribeFeedStatement implements Statement {
         }
 
         String feedOutputType = getOutputType(mdTxnCtx);
-        FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction();
-        Function function = null;
-        if (appliedFunction != null) {
-            function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
-            if (function == null) {
-                throw new MetadataException(" Unknown function " + appliedFunction);
-            } else if (function.getParams().size() > 1) {
-                throw new MetadataException(
-                        " Incompatible function: " + appliedFunction + " Number if arguments must be 1");
-            }
-        }
-
         StringBuilder builder = new StringBuilder();
         builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
         builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
@@ -105,14 +95,15 @@ public class SubscribeFeedStatement implements Statement {
                 + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
                 + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
 
-        List<String> functionsToApply = connectionRequest.getFunctionsToApply();
+        List<FunctionSignature> functionsToApply = connectionRequest.getFunctionsToApply();
         if ((functionsToApply != null) && functionsToApply.isEmpty()) {
             builder.append(" return $x");
         } else {
+            Function function;
             String rValueName = "x";
             String lValueName = "y";
             int variableIndex = 0;
-            for (String functionName : functionsToApply) {
+            for (FunctionSignature appliedFunction : functionsToApply) {
                 function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
                 variableIndex++;
                 switch (function.getLanguage().toUpperCase()) {
@@ -122,8 +113,8 @@ public class SubscribeFeedStatement implements Statement {
                         builder.append("\n");
                         break;
                     case Function.LANGUAGE_JAVA:
-                        builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$"
-                                + rValueName + ")");
+                        builder.append(" let " + "$" + lValueName + variableIndex + ":=" + function.getName() + "("
+                                + "$" + rValueName + ")");
                         rValueName = lValueName + variableIndex;
                         break;
                 }
@@ -141,7 +132,7 @@ public class SubscribeFeedStatement implements Statement {
         List<Statement> statements;
         try {
             statements = parser.parse();
-            query = ((InsertStatement) statements.get(3)).getQuery();
+            query = ((InsertStatement) statements.get(INSERT_STATEMENT_POS)).getQuery();
         } catch (CompilationException pe) {
             throw new MetadataException(pe);
         }
@@ -179,21 +170,13 @@ public class SubscribeFeedStatement implements Statement {
     }
 
     private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
-        String outputType = null;
+        String outputType;
         EntityId feedId = connectionRequest.getReceivingFeedId();
         Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
-        FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
         try {
-            switch (feed.getFeedType()) {
-                case PRIMARY:
-                    outputType = FeedMetadataUtil
-                            .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
-                            .getTypeName();
-                    break;
-                case SECONDARY:
-                    outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx);
-                    break;
-            }
+            outputType = FeedMetadataUtil
+                    .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
+                    .getTypeName();
             return outputType;
 
         } catch (AlgebricksException | RemoteException | ACIDException ae) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index e5af86f..f4cdfb8 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -98,10 +98,10 @@ import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -702,26 +702,15 @@ CreateFeedStatement FeedSpecification() throws ParseException:
   boolean ifNotExists = false;
   String adapterName = null;
   Map<String,String> properties = null;
-  FunctionSignature appliedFunction = null;
   CreateFeedStatement cfs = null;
   Pair<Identifier,Identifier> sourceNameComponents = null;
 
 }
 {
-  (
-    <SECONDARY> <FEED>  nameComponents = QualifiedName() ifNotExists = IfNotExists()
-    <FROM> <FEED> sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())?
-    {
-      cfs = new CreateSecondaryFeedStatement(nameComponents, sourceNameComponents, appliedFunction, ifNotExists);
-    }
-   |
-    (<PRIMARY>)? <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists()
-    <USING> adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?
-     {
-      cfs = new CreatePrimaryFeedStatement(nameComponents, adapterName, properties, appliedFunction, ifNotExists);
-     }
-  )
+  <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists()
+  <USING> adapterName = AdapterName() properties = Configuration()
   {
+    cfs = new CreateFeedStatement(nameComponents, adapterName, properties, ifNotExists);
     return cfs;
   }
 }
@@ -1157,19 +1146,29 @@ Statement FeedStatement() throws ParseException:
   Pair<Identifier,Identifier> datasetNameComponents = null;
 
   Map<String,String> configuration = null;
+  FunctionSignature appliedFunction = null;
   Statement stmt = null;
   String policy = null;
 }
 {
   (
-    <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName() (policy = GetPolicy())?
+    <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName()
+    (appliedFunction = ApplyFunction())? (policy = GetPolicy())?
       {
-        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, policy, getVarCounter());
+        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunction, policy, getVarCounter());
       }
     | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> <DATASET> datasetNameComponents = QualifiedName()
       {
         stmt = new DisconnectFeedStatement(feedNameComponents, datasetNameComponents);
       }
+    | <START> <FEED> feedNameComponents = QualifiedName()
+      {
+        stmt = new StartFeedStatement (feedNameComponents);
+      }
+    | <STOP> <FEED> feedNameComponents = QualifiedName()
+      {
+        stmt = new StopFeedStatement (feedNameComponents);
+      }
   )
     {
       return stmt;
@@ -2706,6 +2705,8 @@ TOKEN :
   | <SECONDARY : "secondary">
   | <SELECT : "select">
   | <SET : "set">
+  | <START: "start">
+  | <STOP: "stop">
   | <SOME : "some">
   | <TEMPORARY : "temporary">
   | <THEN : "then">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 06fbf33..612b230 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -79,12 +79,12 @@ public interface Statement extends ILangExpression {
         public static final byte INDEX_DECL = 0x13;
         public static final byte CREATE_DATAVERSE = 0x14;
         public static final byte INDEX_DROP = 0x15;
-        public static final byte CREATE_PRIMARY_FEED = 0x16;
-        public static final byte CREATE_SECONDARY_FEED = 0x17;
-        public static final byte DROP_FEED = 0x18;
-        public static final byte CONNECT_FEED = 0x19;
-        public static final byte DISCONNECT_FEED = 0x1a;
-        public static final byte SUBSCRIBE_FEED = 0x1b;
+        public static final byte CREATE_FEED = 0x16;
+        public static final byte DROP_FEED = 0x17;
+        public static final byte START_FEED = 0x18;
+        public static final byte STOP_FEED = 0x19;
+        public static final byte CONNECT_FEED = 0x1a;
+        public static final byte DISCONNECT_FEED = 0x1b;
         public static final byte CREATE_FEED_POLICY = 0x1c;
         public static final byte DROP_FEED_POLICY = 0x1d;
         public static final byte CREATE_FUNCTION = 0x1e;
@@ -93,6 +93,7 @@ public interface Statement extends ILangExpression {
         public static final byte EXTERNAL_DATASET_REFRESH = 0x21;
         public static final byte RUN = 0x22;
         public static final byte EXTENSION = 0x23;
+        public static final byte SUBSCRIBE_FEED = 0x24;
 
         private Kind() {
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index ceab6e9..0bd34ee 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -19,26 +19,28 @@
 package org.apache.asterix.lang.common.statement;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class ConnectFeedStatement implements Statement {
 
     private final Identifier dataverseName;
     private final Identifier datasetName;
     private final String feedName;
     private final String policy;
-    private Query query;
     private int varCounter;
-    private boolean forceConnect = false;
-
-    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+    private final ArrayList<FunctionSignature> appliedFunctions;
 
     public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, Pair<Identifier, Identifier> datasetNameCmp,
-            String policy, int varCounter) {
+            FunctionSignature appliedFunction, String policy, int varCounter) {
+        appliedFunctions = new ArrayList<>();
         if (feedNameCmp.first != null && datasetNameCmp.first != null
                 && !feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
             throw new IllegalArgumentException("Dataverse for source feed and target dataset do not match");
@@ -49,15 +51,9 @@ public class ConnectFeedStatement implements Statement {
         this.feedName = feedNameCmp.second.getValue();
         this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
         this.varCounter = varCounter;
-    }
-
-    public ConnectFeedStatement(Identifier dataverseName, Identifier feedName, Identifier datasetName, String policy,
-            int varCounter) {
-        this.dataverseName = dataverseName;
-        this.datasetName = datasetName;
-        this.feedName = feedName.getValue();
-        this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
-        this.varCounter = varCounter;
+        if (appliedFunction != null) {
+            this.appliedFunctions.add(appliedFunction);
+        }
     }
 
     public Identifier getDataverseName() {
@@ -68,10 +64,6 @@ public class ConnectFeedStatement implements Statement {
         return datasetName;
     }
 
-    public Query getQuery() {
-        return query;
-    }
-
     public int getVarCounter() {
         return varCounter;
     }
@@ -90,18 +82,14 @@ public class ConnectFeedStatement implements Statement {
         return visitor.visit(this, arg);
     }
 
-    public boolean forceConnect() {
-        return forceConnect;
-    }
-
-    public void setForceConnect(boolean forceConnect) {
-        this.forceConnect = forceConnect;
-    }
-
     public String getFeedName() {
         return feedName;
     }
 
+    public List<FunctionSignature> getAppliedFunctions() {
+        return appliedFunctions;
+    }
+
     @Override
     public byte getCategory() {
         return Category.UPDATE;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index 56e7d33..1e7a182 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -23,19 +23,28 @@ import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
-public abstract class CreateFeedStatement implements Statement {
+import java.util.Map;
+
+/**
+ * The new create feed statement only concerns the feed adaptor configuration.
+ * All feeds are considered as primary feeds.
+ */
+public class CreateFeedStatement implements Statement {
 
     private final Pair<Identifier, Identifier> qName;
-    private final FunctionSignature appliedFunction;
     private final boolean ifNotExists;
+    private final String adaptorName;
+    private final Map<String, String> adaptorConfiguration;
 
-    public CreateFeedStatement(Pair<Identifier, Identifier> qName, FunctionSignature appliedFunction,
-            boolean ifNotExists) {
+    public CreateFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName,
+            Map<String, String> adaptorConfiguration, boolean ifNotExists) {
         this.qName = qName;
-        this.appliedFunction = appliedFunction;
         this.ifNotExists = ifNotExists;
+        this.adaptorName = adaptorName;
+        this.adaptorConfiguration = adaptorConfiguration;
     }
 
     public Identifier getDataverseName() {
@@ -46,16 +55,27 @@ public abstract class CreateFeedStatement implements Statement {
         return qName.second;
     }
 
-    public FunctionSignature getAppliedFunction() {
-        return appliedFunction;
-    }
-
     public boolean getIfNotExists() {
         return this.ifNotExists;
     }
 
+    public String getAdaptorName() {
+        return adaptorName;
+    }
+
+    public Map<String, String> getAdaptorConfiguration() {
+        return adaptorConfiguration;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.CREATE_FEED;
+    }
+
     @Override
-    public abstract <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException;
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
 
     @Override
     public byte getCategory() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java
deleted file mode 100644
index c584ed0..0000000
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.common.statement;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-
-public class CreatePrimaryFeedStatement extends CreateFeedStatement {
-
-    private final String adaptorName;
-    private final Map<String, String> adaptorConfiguration;
-
-    public CreatePrimaryFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName,
-            Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
-        super(qName, appliedFunction, ifNotExists);
-        this.adaptorName = adaptorName;
-        this.adaptorConfiguration = adaptorConfiguration;
-    }
-
-    public String getAdaptorName() {
-        return adaptorName;
-    }
-
-    public Map<String, String> getAdaptorConfiguration() {
-        return adaptorConfiguration;
-    }
-
-    @Override
-    public byte getKind() {
-        return Statement.Kind.CREATE_PRIMARY_FEED;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
-        return visitor.visit(this, arg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java
deleted file mode 100644
index 241bcd8..0000000
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.common.statement;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-
-/**
- * Represents the AQL statement for creating a secondary feed.
- * A secondary feed is one that derives its data from another (primary/secondary) feed.
- */
-public class CreateSecondaryFeedStatement extends CreateFeedStatement {
-
-    /** The source feed that provides data for this secondary feed. */
-    private final Pair<Identifier, Identifier> sourceQName;
-
-    public CreateSecondaryFeedStatement(Pair<Identifier, Identifier> qName, Pair<Identifier, Identifier> sourceQName,
-            FunctionSignature appliedFunction, boolean ifNotExists) {
-        super(qName, appliedFunction, ifNotExists);
-        this.sourceQName = sourceQName;
-    }
-
-    public String getSourceFeedDataverse() {
-        return sourceQName.first != null ? sourceQName.first.toString()
-                : getDataverseName() != null ? getDataverseName().getValue() : null;
-    }
-
-    public String getSourceFeedName() {
-        return sourceQName.second != null ? sourceQName.second.toString() : null;
-    }
-
-    @Override
-    public byte getKind() {
-        return Statement.Kind.CREATE_SECONDARY_FEED;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
-        return visitor.visit(this, arg);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
new file mode 100644
index 0000000..b3452b5
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class StartFeedStatement implements Statement {
+
+    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+    private Identifier dataverseName;
+    private Identifier feedName;
+
+    public StartFeedStatement(Pair<Identifier, Identifier> feedNameComp) {
+        dataverseName = feedNameComp.first;
+        feedName = feedNameComp.second;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.START_FEED;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getFeedName() {
+        return feedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java
new file mode 100644
index 0000000..c45933e
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
+public class StopFeedStatement implements Statement {
+
+    private final Identifier dataverseName;
+    private final Identifier feedName;
+
+    public StopFeedStatement(Pair<Identifier, Identifier> feedNameComp) {
+        this.dataverseName = feedNameComp.first;
+        this.feedName = feedNameComp.second;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.STOP_FEED;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getFeedName() {
+        return feedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index eefed9d..35d0a29 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -60,14 +60,14 @@ import org.apache.asterix.lang.common.expression.TypeReferenceExpression;
 import org.apache.asterix.lang.common.expression.UnaryExpr;
 import org.apache.asterix.lang.common.expression.UnorderedListTypeDefinition;
 import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.IntegerLiteral;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
+import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -87,6 +87,8 @@ import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.TypeDecl;
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.UpdateStatement;
@@ -107,14 +109,13 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> {
     private final static String CREATE = "create ";
     private final static String FEED = " feed ";
     private final static String DEFAULT_DATAVERSE_FORMAT = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
+    private final PrintWriter out;
     protected Set<Character> validIdentifierChars = new HashSet<Character>();
     protected Set<Character> validIdentifierStartChars = new HashSet<Character>();
     protected String dataverseSymbol = " dataverse ";
     protected String datasetSymbol = " dataset ";
     protected String assignSymbol = ":=";
 
-    private final PrintWriter out;
-
     public FormatPrintVisitor() {
         this(new PrintWriter(System.out));
     }
@@ -747,35 +748,36 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> {
         if (connectFeedStmt.getPolicy() != null) {
             out.print(" using policy " + revertStringToQuoted(connectFeedStmt.getPolicy()));
         }
+        if (connectFeedStmt.getAppliedFunctions() != null) {
+            out.print(" apply function " + connectFeedStmt.getAppliedFunctions());
+        }
         out.println(SEMICOLON);
         return null;
     }
 
     @Override
-    public Void visit(CreatePrimaryFeedStatement cpfs, Integer step) throws CompilationException {
-        out.print(skip(step) + CREATE + " primary feed ");
-        out.print(generateFullName(cpfs.getDataverseName(), cpfs.getFeedName()));
-        out.print(generateIfNotExists(cpfs.getIfNotExists()));
-        out.print(" using " + cpfs.getAdaptorName() + " ");
-        printConfiguration(cpfs.getAdaptorConfiguration());
-        FunctionSignature func = cpfs.getAppliedFunction();
-        if (func != null) {
-            out.print(" apply function " + generateFullName(func.getNamespace(), func.getName()));
-        }
+    public Void visit(CreateFeedStatement cfs, Integer step) throws CompilationException {
+        out.print(skip(step) + "create " + FEED);
+        out.print(generateFullName(cfs.getDataverseName(), cfs.getFeedName()));
+        out.print(generateIfNotExists(cfs.getIfNotExists()));
+        out.print(" using " + cfs.getAdaptorName() + " ");
+        printConfiguration(cfs.getAdaptorConfiguration());
         out.println(SEMICOLON);
         return null;
     }
 
     @Override
-    public Void visit(CreateSecondaryFeedStatement csfs, Integer step) throws CompilationException {
-        out.print(skip(step) + CREATE + " secondary feed ");
-        out.print(generateFullName(csfs.getDataverseName(), csfs.getFeedName()));
-        out.print(generateIfNotExists(csfs.getIfNotExists()));
-        out.print(" from feed " + generateFullName(csfs.getSourceFeedDataverse(), csfs.getSourceFeedName()));
-        FunctionSignature func = csfs.getAppliedFunction();
-        if (func != null) {
-            out.print(" apply function " + generateFullName(func.getNamespace(), func.getName()));
-        }
+    public Void visit(StartFeedStatement startFeedStatement, Integer step) throws CompilationException {
+        out.print(skip(step) + "start " + FEED);
+        out.print(generateFullName(startFeedStatement.getDataverseName(), startFeedStatement.getFeedName()));
+        out.println(SEMICOLON);
+        return null;
+    }
+
+    @Override
+    public Void visit(StopFeedStatement stopFeedStatement, Integer step) throws CompilationException {
+        out.print(skip(step) + "stop " + FEED);
+        out.print(generateFullName(stopFeedStatement.getDataverseName(), stopFeedStatement.getFeedName()));
         out.println(SEMICOLON);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index a4868d0..117fa77 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -28,10 +28,9 @@ import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
+import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -47,6 +46,8 @@ import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
 import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.TypeDecl;
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.UpdateStatement;
@@ -180,12 +181,17 @@ public abstract class AbstractQueryExpressionVisitor<R, T> implements ILangVisit
     }
 
     @Override
-    public R visit(CreatePrimaryFeedStatement del, T arg) throws CompilationException {
+    public R visit(CreateFeedStatement cfs, T arg) throws CompilationException {
         return null;
     }
 
     @Override
-    public R visit(CreateSecondaryFeedStatement del, T arg) throws CompilationException {
+    public R visit(StartFeedStatement sfs, T arg) throws CompilationException {
+        return null;
+    }
+
+    @Override
+    public R visit(StopFeedStatement sfs, T arg) throws CompilationException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index 4a5c5ed..cace925 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -44,10 +44,9 @@ import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
+import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -65,6 +64,8 @@ import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.TypeDecl;
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.UpdateStatement;
@@ -152,9 +153,11 @@ public interface ILangVisitor<R, T> {
 
     R visit(ConnectFeedStatement del, T arg) throws CompilationException;
 
-    R visit(CreatePrimaryFeedStatement cpfs, T arg) throws CompilationException;
+    R visit(StartFeedStatement sfs, T arg) throws CompilationException;
 
-    R visit(CreateSecondaryFeedStatement csfs, T arg) throws CompilationException;
+    R visit(StopFeedStatement sfs, T arg) throws CompilationException;
+
+    R visit(CreateFeedStatement cfs, T arg) throws CompilationException;
 
     R visit(FeedDropStatement del, T arg) throws CompilationException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 81f00ee..d791c85 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -89,13 +89,13 @@ import org.apache.asterix.lang.common.literal.TrueLiteral;
 import org.apache.asterix.lang.common.parser.ScopeChecker;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.lang.common.statement.StopFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
-import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement;
-import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -741,30 +741,16 @@ CreateFeedStatement FeedSpecification() throws ParseException:
   boolean ifNotExists = false;
   String adapterName = null;
   Map<String,String> properties = null;
-  FunctionSignature appliedFunction = null;
   CreateFeedStatement cfs = null;
   Pair<Identifier,Identifier> sourceNameComponents = null;
-
 }
 {
-  (
-    <SECONDARY> <FEED>  nameComponents = QualifiedName() ifNotExists = IfNotExists()
-      <FROM> <FEED> sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())?
-      {
-        cfs = new CreateSecondaryFeedStatement(nameComponents,
-                                   sourceNameComponents, appliedFunction, ifNotExists);
-      }
-     |
-     (<PRIMARY>)? <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists()
-      <USING> adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?
-       {
-        cfs = new CreatePrimaryFeedStatement(nameComponents,
-                                    adapterName, properties, appliedFunction, ifNotExists);
-       }
-  )
-    {
-      return cfs;
-    }
+  <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists()
+  <USING> adapterName = AdapterName() properties = Configuration()
+  {
+    cfs = new CreateFeedStatement(nameComponents, adapterName, properties, ifNotExists);
+    return cfs;
+  }
 }
 
 CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException:
@@ -1185,12 +1171,43 @@ Statement ConnectionStatement() throws ParseException:
   (
     <CONNECT> stmt = ConnectStatement()
   | <DISCONNECT> stmt = DisconnectStatement()
+  | <START> stmt = StartStatement()
+  | <STOP> stmt = StopStatement()
   )
   {
     return stmt;
   }
 }
 
+Statement StartStatement() throws ParseException:
+{
+  Pair<Identifier,Identifier> feedNameComponents = null;
+
+  Statement stmt = null;
+}
+{
+  <FEED> feedNameComponents = QualifiedName()
+  {
+    stmt = new StartFeedStatement (feedNameComponents);
+    return stmt;
+  }
+}
+
+Statement StopStatement () throws ParseException:
+{
+  Pair<Identifier,Identifier> feedNameComponents = null;
+
+  Statement stmt = null;
+}
+{
+  <FEED> feedNameComponents = QualifiedName()
+  {
+    stmt = new StopFeedStatement (feedNameComponents);
+    return stmt;
+  }
+}
+
+
 Statement DisconnectStatement() throws ParseException:
 {
   Pair<Identifier,Identifier> feedNameComponents = null;
@@ -1218,14 +1235,17 @@ Statement ConnectStatement() throws ParseException:
   Pair<Identifier,Identifier> datasetNameComponents = null;
 
   Map<String,String> configuration = null;
+  FunctionSignature appliedFunction = null;
   Statement stmt = null;
   String policy = null;
 }
 {
   (
-    <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName() (policy = GetPolicy())?
+    <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName()
+    (appliedFunction = ApplyFunction())?  (policy = GetPolicy())?
       {
-        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, policy, getVarCounter());
+        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunction,
+         policy, getVarCounter());
       }
   )
   {
@@ -3200,6 +3220,8 @@ TOKEN [IGNORE_CASE]:
   | <SELECT : "select">
   | <SET : "set">
   | <SOME : "some">
+  | <START : "start">
+  | <STOP : "stop">
   | <TEMPORARY : "temporary">
   | <THEN : "then">
   | <TYPE : "type">


Mime
View raw message