asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xikui Wang (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [WIP] Refactoring for Feed pipeline
Date Fri, 06 Oct 2017 23:37:12 GMT
Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2059

Change subject: [WIP] Refactoring for Feed pipeline
......................................................................

[WIP] Refactoring for Feed pipeline

Change-Id: I0ae5a837613780a4d2c90c98139fdc6d5e040cc9
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
D asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
D asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
R asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
R asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
R asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
R asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
R asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
R asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
C asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
R asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
M asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
40 files changed, 336 insertions(+), 417 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/59/2059/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 403c26b..068aa29 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -320,56 +320,6 @@
         }
     }
 
-    public static class CompiledConnectFeedStatement implements ICompiledDmlStatement {
-        private final String dataverseName;
-        private final String feedName;
-        private final String datasetName;
-        private final String policyName;
-        private final Query query;
-        private final int varCounter;
-
-        public CompiledConnectFeedStatement(String dataverseName, String feedName, String datasetName,
-                String policyName, Query query, int varCounter) {
-            this.dataverseName = dataverseName;
-            this.feedName = feedName;
-            this.datasetName = datasetName;
-            this.policyName = policyName;
-            this.query = query;
-            this.varCounter = varCounter;
-        }
-
-        @Override
-        public String getDataverseName() {
-            return dataverseName;
-        }
-
-        public String getFeedName() {
-            return feedName;
-        }
-
-        @Override
-        public String getDatasetName() {
-            return datasetName;
-        }
-
-        public int getVarCounter() {
-            return varCounter;
-        }
-
-        public Query getQuery() {
-            return query;
-        }
-
-        @Override
-        public byte getKind() {
-            return Statement.Kind.CONNECT_FEED;
-        }
-
-        public String getPolicyName() {
-            return policyName;
-        }
-    }
-
     public static class CompiledSubscribeFeedStatement implements ICompiledDmlStatement {
 
         private FeedConnectionRequest request;
@@ -402,38 +352,6 @@
         public byte getKind() {
             return Statement.Kind.SUBSCRIBE_FEED;
         }
-    }
-
-    public static class CompiledDisconnectFeedStatement implements ICompiledDmlStatement {
-        private final String dataverseName;
-        private final String datasetName;
-        private final String feedName;
-
-        public CompiledDisconnectFeedStatement(String dataverseName, String feedName, String datasetName) {
-            this.dataverseName = dataverseName;
-            this.feedName = feedName;
-            this.datasetName = datasetName;
-        }
-
-        @Override
-        public String getDataverseName() {
-            return dataverseName;
-        }
-
-        @Override
-        public String getDatasetName() {
-            return datasetName;
-        }
-
-        public String getFeedName() {
-            return feedName;
-        }
-
-        @Override
-        public byte getKind() {
-            return Statement.Kind.DISCONNECT_FEED;
-        }
-
     }
 
     public static class CompiledDeleteStatement implements ICompiledDmlStatement {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index a1c5cf4..794bf9b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -409,15 +409,6 @@
                     leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
                             additionalFilteringExpressions, assign);
                     break;
-                case Statement.Kind.CONNECT_FEED:
-                    leafOperator = translateConnectFeed(targetDatasource, varRef, varRefsForLoading,
-                            additionalFilteringExpressions, assign);
-                    break;
-                case Statement.Kind.SUBSCRIBE_FEED:
-                    leafOperator = translateSubscribeFeed((CompiledSubscribeFeedStatement) stmt, targetDatasource,
-                            unnestVar, topOp, exprs, resVar, varRefsForLoading, varRef, assign,
-                            additionalFilteringField, additionalFilteringAssign, additionalFilteringExpressions);
-                    break;
                 default:
                     throw new AlgebricksException("Unsupported statement kind " + stmt.getKind());
             }
@@ -427,18 +418,6 @@
         ILogicalPlan plan = new ALogicalPlanImpl(globalPlanRoots);
         eliminateSharedOperatorReferenceForPlan(plan);
         return plan;
-    }
-
-    private ILogicalOperator translateConnectFeed(DatasetDataSource targetDatasource,
-            Mutable<ILogicalExpression> varRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign) {
-        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
-        insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.getInputs().add(new MutableObject<>(assign));
-        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
-        leafOperator.getInputs().add(new MutableObject<>(insertOp));
-        return leafOperator;
     }
 
     private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
@@ -455,100 +434,6 @@
         deleteOp.getInputs().add(new MutableObject<>(assign));
         ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
-        return leafOperator;
-    }
-
-    private ILogicalOperator translateSubscribeFeed(CompiledSubscribeFeedStatement sfs,
-            DatasetDataSource targetDatasource, LogicalVariable unnestVar, ILogicalOperator topOp,
-            ArrayList<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading, Mutable<ILogicalExpression> varRef,
-            ILogicalOperator assign, List<String> additionalFilteringField, AssignOperator additionalFilteringAssign,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions) throws AlgebricksException {
-        // if the feed is a change feed (i.e, performs different operations), we need to project op variable
-        InsertDeleteUpsertOperator feedModificationOp;
-        AssignOperator metaAndKeysAssign;
-        List<LogicalVariable> metaAndKeysVars = null;
-        List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
-        List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
-        Feed feed = metadataProvider.findFeed(sfs.getDataverseName(), sfs.getFeedName());
-        boolean isChangeFeed = ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
-        boolean isUpsertFeed = ExternalDataUtils.isUpsertFeed(feed.getAdapterConfiguration());
-
-        ProjectOperator project = (ProjectOperator) topOp;
-        if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
-            metaAndKeysVars = new ArrayList<>();
-            metaAndKeysExprs = new ArrayList<>();
-            if (targetDatasource.getDataset().hasMetaPart()) {
-                // add the meta function
-                IFunctionInfo finfoMeta = FunctionUtil.getFunctionInfo(BuiltinFunctions.META);
-                ScalarFunctionCallExpression metaFunction = new ScalarFunctionCallExpression(finfoMeta,
-                        new MutableObject<>(new VariableReferenceExpression(unnestVar)));
-                // create assign for the meta part
-                LogicalVariable metaVar = context.newVar();
-                metaExpSingletonList = new ArrayList<>(1);
-                metaExpSingletonList.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
-                metaAndKeysVars.add(metaVar);
-                metaAndKeysExprs.add(new MutableObject<>(metaFunction));
-                project.getVariables().add(metaVar);
-            }
-        }
-        if (isChangeFeed) {
-            varRefsForLoading.clear();
-            for (Mutable<ILogicalExpression> assignExpr : exprs) {
-                if (assignExpr.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                    AbstractFunctionCallExpression funcCall = (AbstractFunctionCallExpression) assignExpr.getValue();
-                    funcCall.substituteVar(resVar, unnestVar);
-                    LogicalVariable pkVar = context.newVar();
-                    metaAndKeysVars.add(pkVar);
-                    metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
-                    project.getVariables().add(pkVar);
-                    varRefsForLoading.add(new MutableObject<>(new VariableReferenceExpression(pkVar)));
-                }
-            }
-            // A change feed, we don't need the assign to access PKs
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            // Create and add a new variable used for representing the original record
-            feedModificationOp.setPrevRecordVar(context.newVar());
-            feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
-            if (targetDatasource.getDataset().hasMetaPart()) {
-                List<LogicalVariable> metaVars = new ArrayList<>();
-                metaVars.add(context.newVar());
-                feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
-                List<Object> metaTypes = new ArrayList<>();
-                metaTypes.add(targetDatasource.getMetaItemType());
-                feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
-            }
-
-            if (additionalFilteringField != null) {
-                feedModificationOp.setPrevFilterVar(context.newVar());
-                feedModificationOp.setPrevFilterType(
-                        ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
-                additionalFilteringAssign.getInputs().clear();
-                additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
-            } else {
-                feedModificationOp.getInputs().add(assign.getInputs().get(0));
-            }
-        } else {
-            final InsertDeleteUpsertOperator.Kind opKind =
-                    isUpsertFeed ? InsertDeleteUpsertOperator.Kind.UPSERT : InsertDeleteUpsertOperator.Kind.INSERT;
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                    metaExpSingletonList, opKind, false);
-            if (isUpsertFeed) {
-                feedModificationOp.setPrevRecordVar(context.newVar());
-                feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
-            }
-            feedModificationOp.getInputs().add(new MutableObject<>(assign));
-        }
-        if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
-            metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
-            metaAndKeysAssign.getInputs().add(topOp.getInputs().get(0));
-            topOp.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-        }
-        feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
-        leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
         return leafOperator;
     }
 
@@ -570,7 +455,6 @@
         if (targetDatasource.getDataset().hasMetaPart()) {
             if (returnExpression != null) {
                 throw new AlgebricksException("Returning not allowed on datasets with Meta records");
-
             }
             AssignOperator metaAndKeysAssign;
             List<LogicalVariable> metaAndKeysVars;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 583302b..4b78b93 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -50,7 +50,6 @@
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
 import org.apache.asterix.lang.common.base.IQueryRewriter;
 import org.apache.asterix.lang.common.base.IReturningStatement;
@@ -59,6 +58,7 @@
 import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.optimizer.base.FuzzyUtils;
@@ -117,7 +117,7 @@
             ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
                     CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
                     FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
-                    FuzzyUtils.SIM_THRESHOLD_PROP_NAME, SubscribeFeedStatement.WAIT_FOR_COMPLETION,
+                    FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
                     FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, "inline_with",
                     "hash_merge", "output-record-type");
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 38e8a21..c0ce6ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -67,8 +67,8 @@
     @Override
     public synchronized void remove(Dataset dataset) throws HyracksDataException {
         super.remove(dataset);
-        feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName())
-                && o.getDatasetName().equals(dataset.getDatasetName()));
+        feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName()) && o.getDatasetName()
+                .equals(dataset.getDatasetName()));
     }
 
     public synchronized void addFeedConnection(FeedConnection feedConnection) {
@@ -82,12 +82,8 @@
     @Override
     protected void doStart(MetadataProvider mdProvider) throws HyracksDataException {
         try {
-            ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-            IStorageComponentProvider storageComponentProvider = new StorageComponentProvider();
-            DefaultStatementExecutorFactory statementExecutorFactory = new DefaultStatementExecutorFactory();
-            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations.buildStartFeedJob(
-                    ((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections,
-                    compilationProvider, storageComponentProvider, statementExecutorFactory, hcc);
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
+                    FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
             JobSpecification feedJob = jobInfo.getLeft();
             WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
                     ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED));
@@ -119,8 +115,8 @@
             // Construct ActiveMessage
             for (int i = 0; i < getLocations().getLocations().length; i++) {
                 String intakeLocation = getLocations().getLocations()[i];
-                FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation,
-                        i);
+                FeedOperations
+                        .SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId, intakeLocation, i);
             }
             eventSubscriber.sync();
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6b4483c..91d3e46 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -122,6 +122,7 @@
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -1669,7 +1670,8 @@
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
             }
             Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
-                    cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), Function.LANGUAGE_AQL,
+                    cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
+                    rewriterFactory instanceof SqlppRewriterFactory ? Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());
             MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
 
@@ -2186,7 +2188,7 @@
                 throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName);
             }
             fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName,
-                    outputType.toString());
+                    outputType.getTypeName());
             MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             if (listener != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index cc95770..6944920 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -20,6 +20,7 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -33,32 +34,53 @@
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.IntegerLiteral;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.lang.sqlpp.clause.FromClause;
+import org.apache.asterix.lang.sqlpp.clause.FromTerm;
+import org.apache.asterix.lang.sqlpp.clause.SelectBlock;
+import org.apache.asterix.lang.sqlpp.clause.SelectClause;
+import org.apache.asterix.lang.sqlpp.clause.SelectElement;
+import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
+import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedConnection;
@@ -109,6 +131,8 @@
  */
 public class FeedOperations {
 
+    public static final String FEED_DATAFLOW_INTERMEIDATE_VAL_PREFIX = "int_val_for_feed_fun";
+
     private FeedOperations() {
     }
 
@@ -154,30 +178,69 @@
         return spec;
     }
 
-    private static JobSpecification getConnectionJob(SessionOutput sessionOutput, MetadataProvider metadataProvider,
-            FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider,
-            IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
-            IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException {
-        DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedConnection.getDataverseName()));
-        FeedConnectionRequest fcr =
-                new FeedConnectionRequest(FeedRuntimeType.INTAKE, feedConnection.getAppliedFunctions(),
-                        feedConnection.getDatasetName(), feedConnection.getPolicyName(), feedConnection.getFeedId());
-        SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, fcr);
-        subscribeStmt.initialize(metadataProvider.getMetadataTxnContext());
-        List<Statement> statements = new ArrayList<>();
-        statements.add(dataverseDecl);
-        statements.add(subscribeStmt);
-        IStatementExecutor translator = qtFactory.create(metadataProvider.getApplicationContext(), statements,
-                sessionOutput, compilationProvider, storageComponentProvider);
-        // configure the metadata provider
-        metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
-        metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy());
-        metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
-                StringUtils.join(subscribeStmt.getLocations(), ','));
+    private static List<Expression> addArgs(Object... args) {
+        List<Expression> argExprs = new ArrayList<>();
+        for (Object arg : args) {
+            if (arg instanceof Integer) {
+                argExprs.add(new LiteralExpr(new IntegerLiteral((Integer) arg)));
+            } else if (arg instanceof String) {
+                argExprs.add(new LiteralExpr(new StringLiteral((String) arg)));
+            } else if (arg instanceof VariableExpr){
+                argExprs.add((VariableExpr) arg);
+            }
+        }
+        return argExprs;
+    }
 
-        CompiledStatements.CompiledSubscribeFeedStatement csfs = new CompiledStatements.CompiledSubscribeFeedStatement(
-                subscribeStmt.getSubscriptionRequest(), subscribeStmt.getVarCounter());
-        return translator.rewriteCompileQuery(hcc, metadataProvider, subscribeStmt.getQuery(), csfs);
+    private static Query makeConnectionQuery(FeedConnection feedConnection) {
+        // Construct from clause
+        VarIdentifier fromVarId = SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
+        VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
+        // TODO: remove target feedid from args list (xikui)
+        // TODO: Get rid of this INTAKE thing
+        List<Expression> exprList =
+                addArgs(feedConnection.getDataverseName(), feedConnection.getFeedId().getEntityName(),
+                        feedConnection.getFeedId().getEntityName(), FeedRuntimeType.INTAKE.toString(),
+                        feedConnection.getDatasetName(), feedConnection.getOutputType());
+        CallExpr datasrouceCallFunction = new CallExpr(FeedConstants.FEED_COLLECT_FUN_SIGN, exprList);
+        FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
+        FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+        // TODO: This can be the place to add select predicate for ingestion
+        // Attaching functions
+        int varIdx = 1;
+        VariableExpr previousVarExpr = fromTermLeftExpr;
+        ArrayList<LetClause> letClauses = new ArrayList<>();
+        for (FunctionSignature funcSig : feedConnection.getAppliedFunctions()) {
+            VarIdentifier intermediateVar = SqlppVariableUtil
+                    .toInternalVariableIdentifier(FEED_DATAFLOW_INTERMEIDATE_VAL_PREFIX + String.valueOf(varIdx));
+            VariableExpr intermediateVarExpr = new VariableExpr(intermediateVar);
+            CallExpr functionCallExpr = new CallExpr(funcSig, addArgs(previousVarExpr));
+            previousVarExpr = intermediateVarExpr;
+            LetClause letClause = new LetClause(intermediateVarExpr, functionCallExpr);
+            letClauses.add(letClause);
+            varIdx++;
+        }
+        // Constructing select clause
+        SelectElement selectElement = new SelectElement(previousVarExpr);
+        SelectClause selectClause = new SelectClause(selectElement, null, false);
+        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, null, null, null, null);
+        SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
+        SelectExpression body = new SelectExpression(null, selectSetOperation, null, null, true);
+        Query query = new Query(false, true, body, 0);
+        return query;
+    }
+
+    private static JobSpecification getConnectionJob(MetadataProvider metadataProvider, FeedConnection feedConn,
+            IStatementExecutor statementExecutor, IHyracksClientConnection hcc)
+            throws AlgebricksException, RemoteException, ACIDException {
+        metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, feedConn.getPolicyName());
+        Query feedConnQuery = makeConnectionQuery(feedConn);
+        UpsertStatement stmtUpsert = new UpsertStatement(new Identifier(feedConn.getDataverseName()),
+                new Identifier(feedConn.getDatasetName()), feedConnQuery, -1, null, null);
+        CompiledStatements.CompiledUpsertStatement clfrqs =
+                new CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(), feedConn.getDatasetName(),
+                        feedConnQuery, stmtUpsert.getVarCounter(), null, null);
+        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs);
     }
 
     private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,
@@ -220,9 +283,9 @@
             String datasetName = feedConnections.get(iter1).getDatasetName();
             FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), datasetName);
 
-            FeedPolicyEntity feedPolicyEntity =
-                    FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
-                            curFeedConnection.getPolicyName(), metadataProvider.getMetadataTxnContext());
+            FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil
+                    .validateIfPolicyExists(curFeedConnection.getDataverseName(), curFeedConnection.getPolicyName(),
+                            metadataProvider.getMetadataTxnContext());
 
             for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) {
                 IOperatorDescriptor opDesc = entry.getValue();
@@ -275,8 +338,8 @@
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
-                    Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+                    .getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
@@ -358,24 +421,36 @@
         return jobSpec;
     }
 
+    private static IStatementExecutor getSQLPPTranslator(MetadataProvider metadataProvider,
+            SessionOutput sessionOutput) {
+        List<Statement> stmts = new ArrayList<>();
+        DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory();
+        IStatementExecutor translator = qtFactory
+                .create(metadataProvider.getApplicationContext(), stmts, sessionOutput, new SqlppCompilationProvider(),
+                        new StorageComponentProvider());
+        return translator;
+    }
+
     public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
-            SessionOutput sessionOutput, MetadataProvider metadataProvider, Feed feed,
-            List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider,
-            IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory,
-            IHyracksClientConnection hcc) throws Exception {
+            MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
+            IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
-        // TODO: Change the default Datasource to use all possible partitions
         Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
-        //TODO: Add feed policy accessor
         List<JobSpecification> jobsList = new ArrayList<>();
         // Construct the ingestion Job
         JobSpecification intakeJob = intakeInfo.getLeft();
         IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
         String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
+        // Add metadata configs
+        metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, Boolean.TRUE.toString());
+        metadataProvider.getConfig()
+                .put(FeedActivityDetails.COLLECT_LOCATIONS, StringUtils.join(ingestionLocations, ','));
+        // TODO: Once we deprecated AQL, this extra queryTranslator can be removed.
+        IStatementExecutor translator =
+                getSQLPPTranslator(metadataProvider, ((QueryTranslator) statementExecutor).getSessionOutput());
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
-            JobSpecification connectionJob = getConnectionJob(sessionOutput, metadataProvider, feedConnection,
-                    ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc);
+            JobSpecification connectionJob = getConnectionJob(metadataProvider, feedConnection, translator, hcc);
             jobsList.add(connectionJob);
         }
         return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
index 334dd52..17a4d4b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
@@ -19,5 +19,10 @@
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
   <test-group name="failed">
+    <test-case FilePath="feeds">
+      <compilation-unit name="connect-feed-with-function">
+        <output-dir compare="Text">connect-feed-with-function</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
deleted file mode 100644
index dcf2278..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-use dataverse experiments;
-stop feed UserFeed;
-disconnect feed UserFeed from dataset TwitterUsers;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
deleted file mode 100644
index 1a06334..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-use dataverse experiments;
-
-for $x in dataset TwitterUsers
-order by $x.screen-name
-return $x.true_popularity;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.ddl.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.2.update.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
similarity index 96%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
index 7722945..46056b1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
@@ -24,5 +24,4 @@
  * Expected Res : Success
  * Date         : 29th Mar 2017
  */
-use dataverse experiments;
 drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
new file mode 100644
index 0000000..4de5415
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a feed and apply two functions in the
+ * workflow. The output of the first function can be used in
+ * the second function. The function parameter can have any
+ * name.
+ * Expected Res : Success
+ * Date         : 4th Oct 2017
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TwitterUser if not exists as open{
+    `screen-name`: string,
+    friends_count: int32,
+    name: string,
+    followers_count: int32
+};
+
+create dataset TwitterUsers(TwitterUser) primary key `screen-name`;
+
+create function test_func0(xyz) {
+    object_merge((case (xyz.followers_count > 25000) when true then {"popularity":"Good!"} else {"popularity":"Bad!"} end), xyz)
+};
+
+create function test_func1(anyname) {
+    object_merge((case (anyname.popularity = "Good!") when true then {"true_popularity":"Indeed Good!"} else {"true_popularity":"Indeed Bad!"} end), anyname)
+};
+
+create feed UserFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TwitterUser"),
+    ("format"="adm"),
+    ("upsert-feed"="true")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
index 4ba1c81..9606396 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
@@ -16,13 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
+use experiments;
 
-stop 10001
\ No newline at end of file
+connect feed UserFeed to dataset TwitterUsers apply function test_func0,test_func1;
+
+start feed UserFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.3.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.3.server.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
similarity index 75%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
index dc5dae0..e50b429 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
+
 2000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
index 4ba1c81..136d142 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
@@ -16,13 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+use experiments;
+stop feed UserFeed;
+disconnect feed UserFeed from dataset TwitterUsers;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
index 4ba1c81..4b1dba7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
@@ -16,13 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
+use experiments;
 
-stop 10001
\ No newline at end of file
+select value t.true_popularity from TwitterUsers t
+order by t.`screen-name`;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
similarity index 75%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
index 4ba1c81..c3ba795 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
@@ -16,13 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
 stop 10001
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
index 4ba1c81..ec08f08 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
@@ -16,13 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
new file mode 100644
index 0000000..9b0f2a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Apply user defined function to feed.
+ * Expected Res : Success
+ * Date         : 4th Oct 2017
+ */
+
+drop dataverse externallibtest if exists;
+create dataverse externallibtest;
+use externallibtest;
+
+create type TweetInputType as open {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+};
+
+create type TweetOutputType as open {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string,
+  topics : {{string}}
+};
+
+create feed TweetFeed
+using localfs
+(("type-name"="TweetInputType"),
+("path"="asterix_nc1://data/twitter/obamatweets.adm"),
+("format"="adm"));
+
+create dataset TweetsFeedIngest(TweetOutputType)
+primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
index 4ba1c81..d1e0e87 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
@@ -16,13 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+install externallibtest testlib target/data/externallib/asterix-external-data-testlib.zip
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
similarity index 77%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 4ba1c81..883cd7a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -17,12 +17,12 @@
  * under the License.
  */
 /*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
+ * Description  : Apply user defined function to feed.
  * Expected Res : Success
- * Date         : 29th Mar 2017
+ * Date         : 4th Oct 2017
  */
+use externallibtest;
 
-stop 10001
\ No newline at end of file
+connect feed TweetFeed to dataset TweetsFeedIngest apply function `testlib#parseTweet`;
+
+start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
similarity index 77%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
index 4ba1c81..607e5bd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
@@ -17,12 +17,13 @@
  * under the License.
  */
 /*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
+ * Description  : Apply user defined function to feed.
  * Expected Res : Success
- * Date         : 29th Mar 2017
+ * Date         : 4th Oct 2017
  */
+// polltimeoutsecs=5
+use externallibtest;
 
-stop 10001
\ No newline at end of file
+select value t from TweetsFeedIngest t
+ORDER BY t.id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
index 4ba1c81..86af80f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
@@ -16,13 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+uninstall externallibtest testlib
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
index 4ba1c81..2a7acef 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
@@ -16,13 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
- * Description  : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date         : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+drop dataverse externallibtest if exists;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index fbd87b6..e8fba07 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", "ReturnType": "TweetType: closed {\n  id: string,\n  username: string,\n  location: string,\n  text: string,\n  timestamp: string\n}\n", "AppliedFunctions": {{ "feeds.feed_processor" }}, "PolicyName": "Basic" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", "ReturnType": "TweetType", "AppliedFunctions": {{ "feeds.feed_processor" }}, "PolicyName": "Basic" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index abbcaaa..47560f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -252,8 +252,9 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
-      <compilation-unit name="connect-feed-with-aql-function">
-        <output-dir compare="Text">connect-feed-with-aql-function</output-dir>
+      <compilation-unit name="connect-feed-with-function">
+        <output-dir compare="Text">connect-feed-with-function</output-dir>
+        <expected-error>Incompatible function language</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
index 83cdd82..8f4455f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
@@ -43,5 +43,10 @@
         <output-dir compare="Text">upperCase</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="feed-with-external-function">
+        <output-dir compare="Text">feed-with-external-function</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c87c44b..0d7ef36 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8248,6 +8248,11 @@
         <output-dir compare="Text">upsert-feed</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="connect-feed-with-function">
+        <output-dir compare="Text">connect-feed-with-function</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="hdfs">
     <test-case FilePath="hdfs">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f960ce5..279624d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -129,6 +129,7 @@
     public static final int INDEX_ILLEGAL_REPETITIVE_FIELD = 1052;
     public static final int CANNOT_CREATE_SEC_PRIMARY_IDX_ON_EXT_DATASET = 1053;
     public static final int COMPILATION_FAILED_DUE_TO_REPLICATE_OP = 1054;
+    public static final int COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE = 1055;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7362181..6ce78f0 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -115,6 +115,7 @@
 1052 = Cannot create index with the same field \"%1$s\" specified more than once.
 1053 = Cannot create primary index on external dataset.
 1054 = Compilation failed due to some problem in the query plan.
+1055 = Incompatible function language. Expect %1$s, but %2$s found.
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index 9538711..4f5c9b2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -18,7 +18,13 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+
 public class FeedConstants {
+
+    public static final FunctionSignature FEED_COLLECT_FUN_SIGN =
+            new FunctionSignature(FunctionConstants.ASTERIX_NS, "feed_collect", 6);
 
     public final static String FEEDS_METADATA_DV = "feeds_metadata";
     public final static String FAILED_TUPLE_DATASET = "failed_tuple";
@@ -31,7 +37,6 @@
         public static final String INTAKE_TIMESTAMP = "intake-timestamp";
         public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
         public static final String STORE_TIMESTAMP = "store-timestamp";
-
     }
 
     public static final class MessageConstants {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index dad0d51..1097a07 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -28,6 +28,8 @@
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
index 47a9580..332dd57 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@
     }
 
     public FunctionDecl getFunctionDecl(Function function) throws CompilationException {
+        if (function.getLanguage() != Function.LANGUAGE_AQL) {
+            throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+                    Function.LANGUAGE_AQL, function.getLanguage());
+        }
         String functionBody = function.getFunctionBody();
         List<String> params = function.getParams();
         List<VarIdentifier> varIdentifiers = new ArrayList<VarIdentifier>();
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index f0539c6..a28c196 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -47,7 +47,9 @@
 /**
  * Represents the AQL statement for subscribing to a feed.
  * This AQL statement is private and may not be used by the end-user.
+ *
  */
+@Deprecated
 public class SubscribeFeedStatement implements Statement {
 
     public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
index 2b24ea1..3c1fe5a 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@
     }
 
     public FunctionDecl getFunctionDecl(Function function) throws CompilationException {
+        if (function.getLanguage() != Function.LANGUAGE_SQLPP) {
+            throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+                    Function.LANGUAGE_SQLPP, function.getLanguage());
+        }
         String functionBody = function.getFunctionBody();
         List<String> params = function.getParams();
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index fa60bba..9156b0f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -63,6 +63,7 @@
 import org.apache.asterix.metadata.utils.InvertedIndexResourceFactoryProvider;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.RecordUtil;
@@ -85,6 +86,10 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
@@ -289,8 +294,31 @@
                 && Objects.equals(datasetName, otherDataset.datasetName);
     }
 
-    public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended
-        return !hasMetaPart();
+    public boolean allow(ILogicalOperator topOp, byte operation) {
+        if (!hasMetaPart()) {
+            return true;
+        }
+        if (topOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        ILogicalOperator op = topOp.getInputs().get(0).getValue();
+        while ((!op.getInputs().isEmpty())
+                && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            op = op.getInputs().get(0).getValue();
+        }
+        if (op.getInputs().isEmpty()) {
+            return false;
+        }
+        UnnestOperator unnestOp = (UnnestOperator) op.getInputs().get(0).getValue();
+        if (unnestOp.getExpressionRef().getValue().getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression functionCall =
+                (AbstractFunctionCallExpression) unnestOp.getExpressionRef().getValue();
+        if (functionCall.getFunctionIdentifier() != BuiltinFunctions.FEED_COLLECT) {
+            return false;
+        }
+        return operation == DatasetUtil.OP_UPSERT;
     }
 
     /**
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 7ff423c..1d1db37 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -26,6 +26,7 @@
 public class Function implements IMetadataEntity<Function> {
     private static final long serialVersionUID = 1L;
     public static final String LANGUAGE_AQL = "AQL";
+    public static final String LANGUAGE_SQLPP = "SQLPP";
     public static final String LANGUAGE_JAVA = "JAVA";
 
     public static final String RETURNTYPE_VOID = "VOID";

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2059
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0ae5a837613780a4d2c90c98139fdc6d5e040cc9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkkwww@gmail.com>

Mime
View raw message