asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [6/7] asterixdb git commit: Refactor General Active Classes
Date Fri, 22 Jul 2016 13:34:08 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index b8786d1..3689357 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -39,6 +39,7 @@ import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.aql.util.RangeMapBuilder;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Expression.Kind;
+import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.clause.LimitClause;
@@ -247,8 +248,8 @@ class LangExpressionToPlanTranslator
         }
 
         List<String> additionalFilteringField = DatasetUtils.getFilterField(targetDatasource.getDataset());
-        List<LogicalVariable> additionalFilteringVars = null;
-        List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions = null;
+        List<LogicalVariable> additionalFilteringVars;
+        List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions;
         List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
         AssignOperator additionalFilteringAssign = null;
         if (additionalFilteringField != null) {
@@ -280,7 +281,7 @@ class LangExpressionToPlanTranslator
     @SuppressWarnings("unchecked")
     @Override
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
-            throws AlgebricksException, AsterixException {
+            throws AlgebricksException {
         Pair<ILogicalOperator, LogicalVariable> p =
                 expr.accept(this, new MutableObject<>(new EmptyTupleSourceOperator()));
         ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
@@ -348,8 +349,8 @@ class LangExpressionToPlanTranslator
 
             AssignOperator assign = new AssignOperator(vars, exprs);
             List<String> additionalFilteringField = DatasetUtils.getFilterField(targetDatasource.getDataset());
-            List<LogicalVariable> additionalFilteringVars = null;
-            List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions = null;
+            List<LogicalVariable> additionalFilteringVars;
+            List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions;
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
             AssignOperator additionalFilteringAssign = null;
             if (additionalFilteringField != null) {
@@ -369,148 +370,31 @@ class LangExpressionToPlanTranslator
             }
 
             Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(resVar));
-            ILogicalOperator leafOperator = null;
-
+            ILogicalOperator leafOperator;
             switch (stmt.getKind()) {
-                case INSERT: {
-                    if (targetDatasource.getDataset().hasMetaPart()) {
-                        throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
-                                + ": insert into dataset is not supported on Datasets with Meta records");
-                    }
-                    InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                            varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
-                    insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    insertOp.getInputs().add(new MutableObject<>(assign));
-                    leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<>(insertOp));
+                case Statement.INSERT:
+                    leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
+                            additionalFilteringExpressions, assign);
                     break;
-                }
-                case UPSERT: {
-                    if (targetDatasource.getDataset().hasMetaPart()) {
-                        throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
-                                + ": upsert into dataset is not supported on Datasets with Meta records");
-                    }
-                    InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                            varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
-                    upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    upsertOp.getInputs().add(new MutableObject<>(assign));
-                    // Create and add a new variable used for representing the original record
-                    ARecordType recordType = (ARecordType) targetDatasource.getItemType();
-                    upsertOp.setPrevRecordVar(context.newVar());
-                    upsertOp.setPrevRecordType(recordType);
-                    if (additionalFilteringField != null) {
-                        upsertOp.setPrevFilterVar(context.newVar());
-                        upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
-                    }
-                    leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<>(upsertOp));
+                case Statement.UPSERT:
+                    leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
+                            additionalFilteringExpressions, assign, additionalFilteringField);
                     break;
-                }
-                case DELETE: {
-                    if (targetDatasource.getDataset().hasMetaPart()) {
-                        throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
-                                + ": delete from dataset is not supported on Datasets with Meta records");
-                    }
-                    InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                            varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
-                    deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    deleteOp.getInputs().add(new MutableObject<>(assign));
-                    leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<>(deleteOp));
+                case Statement.DELETE:
+                    leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
+                            additionalFilteringExpressions, assign);
                     break;
-                }
-                case CONNECT_FEED: {
-                    InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
-                            varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
-                    insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    insertOp.getInputs().add(new MutableObject<>(assign));
-                    leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<>(insertOp));
+                case Statement.CONNECT_FEED:
+                    leafOperator = translateConnectFeed(targetDatasource, varRef, varRefsForLoading,
+                            additionalFilteringExpressions, assign);
                     break;
-                }
-                case SUBSCRIBE_FEED: {
-                    // if the feed is a change feed (i.e, performs different operations), we need to project op variable
-                    CompiledSubscribeFeedStatement sfs = (CompiledSubscribeFeedStatement) stmt;
-                    InsertDeleteUpsertOperator feedModificationOp;
-                    AssignOperator metaAndKeysAssign = null;
-                    List<LogicalVariable> metaAndKeysVars = null;
-                    List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
-                    List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
-                    boolean isChangeFeed =
-                            FeedMetadataUtil.isChangeFeed(metadataProvider, sfs.getDataverseName(), sfs.getFeedName());
-                    if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
-                        metaAndKeysVars = new ArrayList<>();
-                        metaAndKeysExprs = new ArrayList<>();
-                    }
-                    if (targetDatasource.getDataset().hasMetaPart()) {
-                        // add the meta function
-                        IFunctionInfo finfoMeta = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.META);
-                        ScalarFunctionCallExpression metaFunction = new ScalarFunctionCallExpression(finfoMeta,
-                                new MutableObject<>(new VariableReferenceExpression(unnestVar)));
-                        // create assign for the meta part
-                        LogicalVariable metaVar = context.newVar();
-                        metaExpSingletonList = new ArrayList<>(1);
-                        metaExpSingletonList.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
-                        metaAndKeysVars.add(metaVar);
-                        metaAndKeysExprs.add(new MutableObject<>(metaFunction));
-                        project.getVariables().add(metaVar);
-                    }
-                    if (isChangeFeed) {
-                        varRefsForLoading.clear();
-                        for (Mutable<ILogicalExpression> assignExpr : exprs) {
-                            if (assignExpr.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                                AbstractFunctionCallExpression funcCall =
-                                        (AbstractFunctionCallExpression) assignExpr.getValue();
-                                funcCall.substituteVar(resVar, unnestVar);
-                                LogicalVariable pkVar = context.newVar();
-                                metaAndKeysVars.add(pkVar);
-                                metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
-                                project.getVariables().add(pkVar);
-                                varRefsForLoading.add(new MutableObject<>(new VariableReferenceExpression(pkVar)));
-                            }
-                        }
-                        // A change feed, we don't need the assign to access PKs
-                        feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                                metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
-                        // Create and add a new variable used for representing the original record
-                        feedModificationOp.setPrevRecordVar(context.newVar());
-                        feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
-                        if (targetDatasource.getDataset().hasMetaPart()) {
-                            List<LogicalVariable> metaVars = new ArrayList<>();
-                            metaVars.add(context.newVar());
-                            feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
-                            List<Object> metaTypes = new ArrayList<>();
-                            metaTypes.add(targetDatasource.getMetaItemType());
-                            feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
-                        }
-
-                        if (additionalFilteringField != null) {
-                            feedModificationOp.setPrevFilterVar(context.newVar());
-                            feedModificationOp.setPrevFilterType(((ARecordType) targetDatasource.getItemType())
-                                    .getFieldType(additionalFilteringField.get(0)));
-                            additionalFilteringAssign.getInputs().clear();
-                            additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                            feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
-                        } else {
-                            feedModificationOp.getInputs().add(assign.getInputs().get(0));
-                        }
-                    } else {
-                        feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                                metaExpSingletonList, InsertDeleteUpsertOperator.Kind.INSERT, false);
-                        feedModificationOp.getInputs().add(new MutableObject<>(assign));
-                    }
-                    if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
-                        metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
-                        metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
-                        project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-                    }
-                    feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                    leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
+                case Statement.SUBSCRIBE_FEED:
+                    leafOperator = translateSubscribeFeed((CompiledSubscribeFeedStatement) stmt, targetDatasource,
+                            unnestVar, project, exprs, resVar, varRefsForLoading, varRef, assign,
+                            additionalFilteringField, additionalFilteringAssign, additionalFilteringExpressions);
                     break;
-                }
                 default:
-                    break;
+                    throw new AlgebricksException("Unsupported statement kind " + stmt.getKind());
             }
             topOp = leafOperator;
         }
@@ -520,6 +404,162 @@ class LangExpressionToPlanTranslator
         return plan;
     }
 
+    private ILogicalOperator translateConnectFeed(DatasetDataSource targetDatasource,
+            Mutable<ILogicalExpression> varRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign) {
+        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
+        insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        insertOp.getInputs().add(new MutableObject<>(assign));
+        SinkOperator leafOperator = new SinkOperator();
+        leafOperator.getInputs().add(new MutableObject<>(insertOp));
+        return leafOperator;
+    }
+
+    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
+            throws AlgebricksException {
+        if (targetDatasource.getDataset().hasMetaPart()) {
+            throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+                    + ": delete from dataset is not supported on Datasets with Meta records");
+        }
+        InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
+        deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        deleteOp.getInputs().add(new MutableObject<>(assign));
+        SinkOperator leafOperator = new SinkOperator();
+        leafOperator.getInputs().add(new MutableObject<>(deleteOp));
+        return leafOperator;
+    }
+
+    private ILogicalOperator translateSubscribeFeed(CompiledSubscribeFeedStatement sfs,
+            DatasetDataSource targetDatasource, LogicalVariable unnestVar, ProjectOperator project,
+            ArrayList<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, Mutable<ILogicalExpression> varRef,
+            ILogicalOperator assign, List<String> additionalFilteringField, AssignOperator additionalFilteringAssign,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions) throws AlgebricksException {
+        // if the feed is a change feed (i.e, performs different operations), we need to project op variable
+        InsertDeleteUpsertOperator feedModificationOp;
+        AssignOperator metaAndKeysAssign;
+        List<LogicalVariable> metaAndKeysVars = null;
+        List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
+        List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
+        boolean isChangeFeed =
+                FeedMetadataUtil.isChangeFeed(metadataProvider, sfs.getDataverseName(), sfs.getFeedName());
+        if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
+            metaAndKeysVars = new ArrayList<>();
+            metaAndKeysExprs = new ArrayList<>();
+            if (targetDatasource.getDataset().hasMetaPart()) {
+                // add the meta function
+                IFunctionInfo finfoMeta = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.META);
+                ScalarFunctionCallExpression metaFunction = new ScalarFunctionCallExpression(finfoMeta,
+                        new MutableObject<>(new VariableReferenceExpression(unnestVar)));
+                // create assign for the meta part
+                LogicalVariable metaVar = context.newVar();
+                metaExpSingletonList = new ArrayList<>(1);
+                metaExpSingletonList.add(new MutableObject<>(new VariableReferenceExpression(metaVar)));
+                metaAndKeysVars.add(metaVar);
+                metaAndKeysExprs.add(new MutableObject<>(metaFunction));
+                project.getVariables().add(metaVar);
+            }
+        }
+        if (isChangeFeed) {
+            varRefsForLoading.clear();
+            for (Mutable<ILogicalExpression> assignExpr : exprs) {
+                if (assignExpr.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression funcCall = (AbstractFunctionCallExpression) assignExpr.getValue();
+                    funcCall.substituteVar(resVar, unnestVar);
+                    LogicalVariable pkVar = context.newVar();
+                    metaAndKeysVars.add(pkVar);
+                    metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
+                    project.getVariables().add(pkVar);
+                    varRefsForLoading.add(new MutableObject<>(new VariableReferenceExpression(pkVar)));
+                }
+            }
+            // A change feed, we don't need the assign to access PKs
+            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            // Create and add a new variable used for representing the original record
+            feedModificationOp.setPrevRecordVar(context.newVar());
+            feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+            if (targetDatasource.getDataset().hasMetaPart()) {
+                List<LogicalVariable> metaVars = new ArrayList<>();
+                metaVars.add(context.newVar());
+                feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+                List<Object> metaTypes = new ArrayList<>();
+                metaTypes.add(targetDatasource.getMetaItemType());
+                feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+            }
+
+            if (additionalFilteringField != null) {
+                feedModificationOp.setPrevFilterVar(context.newVar());
+                feedModificationOp.setPrevFilterType(
+                        ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
+                additionalFilteringAssign.getInputs().clear();
+                additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
+                feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+            } else {
+                feedModificationOp.getInputs().add(assign.getInputs().get(0));
+            }
+        } else {
+            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.INSERT, false);
+            feedModificationOp.getInputs().add(new MutableObject<>(assign));
+        }
+        if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
+            metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
+            metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
+            project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
+        }
+        feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        SinkOperator leafOperator = new SinkOperator();
+        leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
+        return leafOperator;
+    }
+
+    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+            List<String> additionalFilteringField) throws AlgebricksException {
+        if (targetDatasource.getDataset().hasMetaPart()) {
+            throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+                    + ": upsert into dataset is not supported on Datasets with Meta records");
+        }
+        InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+        upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        upsertOp.getInputs().add(new MutableObject<>(assign));
+        // Create and add a new variable used for representing the original record
+        ARecordType recordType = (ARecordType) targetDatasource.getItemType();
+        upsertOp.setPrevRecordVar(context.newVar());
+        upsertOp.setPrevRecordType(recordType);
+        if (additionalFilteringField != null) {
+            upsertOp.setPrevFilterVar(context.newVar());
+            upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+        }
+        SinkOperator leafOperator = new SinkOperator();
+        leafOperator.getInputs().add(new MutableObject<>(upsertOp));
+        return leafOperator;
+    }
+
+    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading,
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
+            throws AlgebricksException {
+        if (targetDatasource.getDataset().hasMetaPart()) {
+            throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+                    + ": insert into dataset is not supported on Datasets with Meta records");
+        }
+        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
+        insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+        insertOp.getInputs().add(new MutableObject<>(assign));
+        SinkOperator leafOperator = new SinkOperator();
+        leafOperator.getInputs().add(new MutableObject<>(insertOp));
+        return leafOperator;
+    }
+
     private DatasetDataSource validateDatasetInfo(AqlMetadataProvider metadataProvider, String dataverseName,
             String datasetName) throws AlgebricksException {
         Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
@@ -534,9 +574,8 @@ class LangExpressionToPlanTranslator
         IAType metaItemType =
                 metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
         INodeDomain domain = metadataProvider.findNodeDomain(dataset.getNodeGroupName());
-        DatasetDataSource dataSource = new DatasetDataSource(sourceId, dataset, itemType, metaItemType,
-                AqlDataSourceType.INTERNAL_DATASET, dataset.getDatasetDetails(), domain);
-        return dataSource;
+        return new DatasetDataSource(sourceId, dataset, itemType, metaItemType, AqlDataSourceType.INTERNAL_DATASET,
+                dataset.getDatasetDetails(), domain);
     }
 
     private FileSplit getDefaultOutputFileLocation() throws MetadataException {
@@ -552,23 +591,17 @@ class LangExpressionToPlanTranslator
             throws AsterixException {
         LogicalVariable v;
         ILogicalOperator returnedOp;
-
-        switch (lc.getBindingExpr().getKind()) {
-            case VARIABLE_EXPRESSION: {
-                v = context.newVar(lc.getVarExpr());
-                LogicalVariable prev = context.getVar(((VariableExpr) lc.getBindingExpr()).getVar().getId());
-                returnedOp = new AssignOperator(v, new MutableObject<>(new VariableReferenceExpression(prev)));
-                returnedOp.getInputs().add(tupSource);
-                break;
-            }
-            default: {
-                v = context.newVar(lc.getVarExpr());
-                Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
-                        langExprToAlgExpression(lc.getBindingExpr(), tupSource);
-                returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
-                returnedOp.getInputs().add(eo.second);
-                break;
-            }
+        if (lc.getBindingExpr().getKind() == Kind.VARIABLE_EXPRESSION) {
+            v = context.newVar(lc.getVarExpr());
+            LogicalVariable prev = context.getVar(((VariableExpr) lc.getBindingExpr()).getVar().getId());
+            returnedOp = new AssignOperator(v, new MutableObject<>(new VariableReferenceExpression(prev)));
+            returnedOp.getInputs().add(tupSource);
+        } else {
+            v = context.newVar(lc.getVarExpr());
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
+                    langExprToAlgExpression(lc.getBindingExpr(), tupSource);
+            returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
+            returnedOp.getInputs().add(eo.second);
         }
         return new Pair<>(returnedOp, v);
     }
@@ -621,18 +654,16 @@ class LangExpressionToPlanTranslator
 
         for (Expression expr : fcall.getExprList()) {
             switch (expr.getKind()) {
-                case VARIABLE_EXPRESSION: {
+                case VARIABLE_EXPRESSION:
                     LogicalVariable var = context.getVar(((VariableExpr) expr).getVar().getId());
                     args.add(new MutableObject<>(new VariableReferenceExpression(var)));
                     break;
-                }
-                case LITERAL_EXPRESSION: {
+                case LITERAL_EXPRESSION:
                     LiteralExpr val = (LiteralExpr) expr;
                     args.add(new MutableObject<>(new ConstantExpression(
                             new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue())))));
                     break;
-                }
-                default: {
+                default:
                     Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = langExprToAlgExpression(expr, topOp);
                     AbstractLogicalOperator o1 = (AbstractLogicalOperator) eo.second.getValue();
                     args.add(new MutableObject<>(eo.first));
@@ -640,7 +671,6 @@ class LangExpressionToPlanTranslator
                         topOp = eo.second;
                     }
                     break;
-                }
             }
         }
 
@@ -677,7 +707,7 @@ class LangExpressionToPlanTranslator
         if (function == null) {
             return null;
         }
-        AbstractFunctionCallExpression f = null;
+        AbstractFunctionCallExpression f;
         if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_JAVA)) {
             IFunctionInfo finfo = ExternalFunctionCompilerUtil
                     .getExternalFunctionInfo(metadataProvider.getMetadataTxnContext(), function);
@@ -694,7 +724,7 @@ class LangExpressionToPlanTranslator
 
     private AbstractFunctionCallExpression lookupBuiltinFunction(String functionName, int arity,
             List<Mutable<ILogicalExpression>> args) {
-        AbstractFunctionCallExpression f = null;
+        AbstractFunctionCallExpression f;
         FunctionIdentifier fi = new FunctionIdentifier(AlgebricksBuiltinFunctions.ALGEBRICKS_NS, functionName, arity);
         AsterixFunctionInfo afi = AsterixBuiltinFunctions.lookupFunction(fi);
         FunctionIdentifier builtinAquafi = afi == null ? null : afi.getFunctionIdentifier();
@@ -1243,17 +1273,15 @@ class LangExpressionToPlanTranslator
     protected Pair<ILogicalExpression, Mutable<ILogicalOperator>> langExprToAlgExpression(Expression expr,
             Mutable<ILogicalOperator> topOpRef) throws AsterixException {
         switch (expr.getKind()) {
-            case VARIABLE_EXPRESSION: {
+            case VARIABLE_EXPRESSION:
                 VariableReferenceExpression ve =
                         new VariableReferenceExpression(context.getVar(((VariableExpr) expr).getVar().getId()));
                 return new Pair<>(ve, topOpRef);
-            }
-            case LITERAL_EXPRESSION: {
+            case LITERAL_EXPRESSION:
                 LiteralExpr val = (LiteralExpr) expr;
                 return new Pair<>(new ConstantExpression(
                         new AsterixConstantValue(ConstantHelper.objectFromLiteral(val.getValue()))), topOpRef);
-            }
-            default: {
+            default:
                 if (expressionNeedsNoNesting(expr)) {
                     Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, topOpRef);
                     ILogicalExpression exp = ((AssignOperator) p.first).getExpressions().get(0).getValue();
@@ -1279,7 +1307,6 @@ class LangExpressionToPlanTranslator
                         return new Pair<>(new VariableReferenceExpression(p.second), new MutableObject<>(s));
                     }
                 }
-            }
         }
     }
 
@@ -1320,11 +1347,9 @@ class LangExpressionToPlanTranslator
             Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = langExprToAlgExpression(exprs.get(i), topOp);
             topOp = p.second;
             // now look at the operator
-            if (i < nOps) {
-                if (ops.get(i) != opLogical) {
-                    throw new TranslationException(
-                            "Unexpected operator " + ops.get(i) + " in an OperatorExpr starting with " + opLogical);
-                }
+            if (i < nOps && ops.get(i) != opLogical) {
+                throw new TranslationException(
+                        "Unexpected operator " + ops.get(i) + " in an OperatorExpr starting with " + opLogical);
             }
             f.getArguments().add(new MutableObject<>(p.first));
         }
@@ -1346,7 +1371,7 @@ class LangExpressionToPlanTranslator
     }
 
     protected <T> List<T> mkSingletonArrayList(T item) {
-        ArrayList<T> array = new ArrayList<T>(1);
+        ArrayList<T> array = new ArrayList<>(1);
         array.add(item);
         return array;
     }
@@ -1356,22 +1381,16 @@ class LangExpressionToPlanTranslator
         argRefs.add(new MutableObject<>(expr));
         switch (expr.getExpressionTag()) {
             case CONSTANT:
-            case VARIABLE: {
+            case VARIABLE:
                 return new UnnestingFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
-            }
-            case FUNCTION_CALL: {
+            case FUNCTION_CALL:
                 AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-                if (fce.getKind() == FunctionKind.UNNEST) {
-                    return expr;
-                } else {
-                    return new UnnestingFunctionCallExpression(
-                            FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
-                }
-            }
-            default: {
+                return (fce.getKind() == FunctionKind.UNNEST) ? expr
+                        : new UnnestingFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
+            default:
                 return expr;
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 6fb82ef..bef8a3a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -41,7 +41,7 @@ import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
 import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
 import org.apache.asterix.lang.common.base.IQueryRewriter;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
@@ -204,12 +204,12 @@ public class APIFramework {
 
         org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
         queryMetadataProvider.setJobId(asterixJobId);
-        ILangExpressionToPlanTranslator t = translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider,
-                varCounter);
+        ILangExpressionToPlanTranslator t =
+                translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider, varCounter);
 
         ILogicalPlan plan;
         // statement = null when it's a query
-        if (statement == null || statement.getKind() != Kind.LOAD) {
+        if (statement == null || statement.getKind() != Statement.LOAD) {
             plan = t.translate(rwQ, outputDatasetName, statement);
         } else {
             plan = t.translateLoad(statement);
@@ -219,7 +219,7 @@ public class APIFramework {
             conf.out().println();
 
             printPlanPrefix(conf, "Logical plan");
-            if (rwQ != null || (statement != null && statement.getKind() == Kind.LOAD)) {
+            if (rwQ != null || (statement != null && statement.getKind() == Statement.LOAD)) {
                 LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
             }
@@ -243,8 +243,8 @@ public class APIFramework {
         OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
         OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
 
-        HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
-                AqlOptimizationContextFactory.INSTANCE);
+        HeuristicCompilerFactoryBuilder builder =
+                new HeuristicCompilerFactoryBuilder(AqlOptimizationContextFactory.INSTANCE);
         builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
         builder.setLogicalRewrites(buildDefaultLogicalRewrites());
         builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
@@ -271,7 +271,7 @@ public class APIFramework {
                     PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
                 } else {
                     printPlanPrefix(conf, "Optimized logical plan");
-                    if (rwQ != null || (statement != null && statement.getKind() == Kind.LOAD)) {
+                    if (rwQ != null || (statement != null && statement.getKind() == Statement.LOAD)) {
                         LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
                         PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
                     }
@@ -315,8 +315,8 @@ public class APIFramework {
         builder.setTypeTraitProvider(format.getTypeTraitProvider());
         builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
 
-        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(asterixJobId,
-                queryMetadataProvider.isWriteTransaction());
+        JobEventListenerFactory jobEventListenerFactory =
+                new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction());
         JobSpecification spec = compiler.createJob(AsterixAppContextInfo.getInstance(), jobEventListenerFactory);
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 1545325..4bb09d4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -52,7 +53,6 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
@@ -121,7 +121,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
-    private FeedManager feedManager;
+    private ActiveManager activeManager;
 
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
@@ -151,13 +151,13 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         txnProperties = new AsterixTransactionProperties(propertiesAccessor);
         feedProperties = new AsterixFeedProperties(propertiesAccessor);
         buildProperties = new AsterixBuildProperties(propertiesAccessor);
-        replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
-                AsterixClusterProperties.INSTANCE.getCluster());
+        replicationProperties =
+                new AsterixReplicationProperties(propertiesAccessor, AsterixClusterProperties.INSTANCE.getCluster());
         this.metadataRmiPort = metadataRmiPort;
     }
 
     @Override
-    public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
+    public void initialize(boolean initialRun) throws IOException, ACIDException {
         Logger.getLogger("org.apache").setLevel(externalProperties.getLogLevel());
 
         threadExecutor = new AsterixThreadExecutor(ncApplicationContext.getThreadFactory());
@@ -173,14 +173,15 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
-        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
+                new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(),
+                        metadataProperties);
 
-        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
-                .createRepository();
+        localResourceRepository =
+                (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
 
-        IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
-                this);
+        IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider =
+                new AsterixAppRuntimeContextProviderForRecovery(this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
 
@@ -198,8 +199,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
         isShuttingdown = false;
 
-        feedManager = new FeedManager(ncApplicationContext.getNodeId(), feedProperties,
-                compilerProperties.getFrameSize());
+        activeManager = new ActiveManager(ncApplicationContext.getNodeId(),
+                feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
 
         if (replicationProperties.isReplicationEnabled()) {
             String nodeId = ncApplicationContext.getNodeId();
@@ -238,9 +239,9 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
             remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties);
 
-            bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
-                    storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(),
-                    replicationManager);
+            bufferCache =
+                    new BufferCache(ioManager, prs, pcp, fileMapManager, storageProperties.getBufferCacheMaxOpenFiles(),
+                            ncApplicationContext.getThreadFactory(), replicationManager);
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
                     storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
@@ -386,8 +387,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     }
 
     @Override
-    public FeedManager getFeedManager() {
-        return feedManager;
+    public ActiveManager getFeedManager() {
+        return activeManager;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
index f9dc45c..e799dd0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/AQLAPIServlet.java
@@ -18,39 +18,30 @@
  */
 package org.apache.asterix.api.http.servlet;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.base.Statement;
 
 public class AQLAPIServlet extends RESTAPIServlet {
 
     private static final long serialVersionUID = 1L;
-
     private static final String AQL_STMT_PARAM_NAME = "aql";
-
-    private static final List<Kind> allowedStatements = new ArrayList<>();
+    private static final List<Byte> allowedStatements = Statement.VALUES;
 
     public AQLAPIServlet(ILangCompilationProvider compilationProvider) {
         super(compilationProvider);
     }
 
-    static {
-        for (Kind k : Kind.values()) {
-            allowedStatements.add(k);
-        }
-    }
-
     @Override
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter(AQL_STMT_PARAM_NAME);
     }
 
     @Override
-    protected List<Kind> getAllowedStatements() {
+    protected List<Byte> getAllowedStatements() {
         return allowedStatements;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
index 0254d6d..ec47276 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/DDLAPIServlet.java
@@ -25,7 +25,6 @@ import javax.servlet.http.HttpServletRequest;
 
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class DDLAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
@@ -34,19 +33,23 @@ public class DDLAPIServlet extends RESTAPIServlet {
         super(compilationProvider);
     }
 
+    @Override
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("ddl");
     }
 
-    protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DATAVERSE_DROP, Kind.DATASET_DECL, Kind.NODEGROUP_DECL,
-                Kind.NODEGROUP_DROP, Kind.TYPE_DECL, Kind.TYPE_DROP, Kind.CREATE_INDEX, Kind.INDEX_DECL,
-                Kind.CREATE_DATAVERSE, Kind.DATASET_DROP, Kind.INDEX_DROP, Kind.CREATE_FUNCTION, Kind.FUNCTION_DROP,
-                Kind.CREATE_PRIMARY_FEED, Kind.CREATE_SECONDARY_FEED, Kind.DROP_FEED, Kind.CREATE_FEED_POLICY,
-                Kind.DROP_FEED_POLICY };
+    @Override
+    protected List<Byte> getAllowedStatements() {
+        Byte[] statementsArray = { Statement.DATAVERSE_DECL, Statement.DATAVERSE_DROP, Statement.DATASET_DECL,
+                Statement.NODEGROUP_DECL, Statement.NODEGROUP_DROP, Statement.TYPE_DECL, Statement.TYPE_DROP,
+                Statement.CREATE_INDEX, Statement.INDEX_DECL, Statement.CREATE_DATAVERSE, Statement.DATASET_DROP,
+                Statement.INDEX_DROP, Statement.CREATE_FUNCTION, Statement.FUNCTION_DROP, Statement.CREATE_PRIMARY_FEED,
+                Statement.CREATE_SECONDARY_FEED, Statement.DROP_FEED, Statement.CREATE_FEED_POLICY,
+                Statement.DROP_FEED_POLICY };
         return Arrays.asList(statementsArray);
     }
 
+    @Override
     protected String getErrorMessage() {
         return "Invalid statement: Non-DDL statement %s to the DDL API.";
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index 8bc613a..78c68e1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -32,13 +32,14 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.watch.FeedActivity;
 import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
 
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
+    private static final String FEED_EXTENSION_NAME = "Feed";
 
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -100,7 +101,8 @@ public class FeedServlet extends HttpServlet {
         String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
 
         FeedConnectionId connectionId = new FeedConnectionId(
-                new FeedId(activity.getDataverseName(), activity.getFeedName()), activity.getDatasetName());
+                new EntityId(FEED_EXTENSION_NAME, activity.getDataverseName(), activity.getFeedName()),
+                activity.getDatasetName());
         int intakeRate = 0;
         int storeRate = 0;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
index 9ada5ff..040ac1a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryAPIServlet.java
@@ -25,7 +25,6 @@ import javax.servlet.http.HttpServletRequest;
 
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class QueryAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
@@ -34,16 +33,19 @@ public class QueryAPIServlet extends RESTAPIServlet {
         super(compilationProvider);
     }
 
+    @Override
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("query");
     }
 
-    protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE,
-                Kind.RUN };
+    @Override
+    protected List<Byte> getAllowedStatements() {
+        Byte[] statementsArray = { Statement.DATAVERSE_DECL, Statement.FUNCTION_DECL, Statement.QUERY, Statement.SET,
+                Statement.WRITE, Statement.RUN };
         return Arrays.asList(statementsArray);
     }
 
+    @Override
     protected String getErrorMessage() {
         return "Invalid statement: Non-query statement %s to the query API.";
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
index 95e55f6..7ca1442 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
@@ -40,7 +40,6 @@ import org.apache.asterix.lang.aql.parser.TokenMgrError;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Statement.Kind;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.result.ResultReader;
 import org.apache.asterix.result.ResultUtils;
@@ -197,8 +196,8 @@ abstract class RESTAPIServlet extends HttpServlet {
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             String errorMessage = ResultUtils.buildParseExceptionMessage(pe, query);
-            JSONObject errorResp = ResultUtils.getErrorResponse(2, errorMessage, "",
-                    ResultUtils.extractFullStackTrace(pe));
+            JSONObject errorResp =
+                    ResultUtils.getErrorResponse(2, errorMessage, "", ResultUtils.extractFullStackTrace(pe));
             sessionConfig.out().write(errorResp.toString());
             response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
         } catch (Exception e) {
@@ -231,7 +230,7 @@ abstract class RESTAPIServlet extends HttpServlet {
 
     protected abstract String getQueryParameter(HttpServletRequest request);
 
-    protected abstract List<Kind> getAllowedStatements();
+    protected abstract List<Byte> getAllowedStatements();
 
     protected abstract String getErrorMessage();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
index 5c76c40..9f762f8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
@@ -25,7 +25,6 @@ import javax.servlet.http.HttpServletRequest;
 
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Statement.Kind;
 
 public class UpdateAPIServlet extends RESTAPIServlet {
     private static final long serialVersionUID = 1L;
@@ -40,10 +39,11 @@ public class UpdateAPIServlet extends RESTAPIServlet {
     }
 
     @Override
-    protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DELETE, Kind.INSERT, Kind.UPSERT, Kind.UPDATE,
-                Kind.DML_CMD_LIST, Kind.LOAD, Kind.CONNECT_FEED, Kind.DISCONNECT_FEED, Kind.SET, Kind.COMPACT,
-                Kind.EXTERNAL_DATASET_REFRESH, Kind.RUN };
+    protected List<Byte> getAllowedStatements() {
+        Byte[] statementsArray =
+                { Statement.DATAVERSE_DECL, Statement.DELETE, Statement.INSERT, Statement.UPSERT, Statement.UPDATE,
+                        Statement.DML_CMD_LIST, Statement.LOAD, Statement.CONNECT_FEED, Statement.DISCONNECT_FEED,
+                        Statement.SET, Statement.COMPACT, Statement.EXTERNAL_DATASET_REFRESH, Statement.RUN };
         return Arrays.asList(statementsArray);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
new file mode 100644
index 0000000..c44ffc1
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
+
+public class ActiveLifecycleListener implements IJobLifecycleListener {
+
+    private static final Logger LOGGER = Logger.getLogger(ActiveLifecycleListener.class.getName());
+    public static final ActiveLifecycleListener INSTANCE = new ActiveLifecycleListener();
+
+    private final LinkedBlockingQueue<ActiveEvent> jobEventInbox;
+    private final ExecutorService executorService;
+
+    private ActiveLifecycleListener() {
+        jobEventInbox = ActiveJobNotificationHandler.INSTANCE.getEventInbox();
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(ActiveJobNotificationHandler.INSTANCE);
+    }
+
+    @Override
+    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) {
+            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_START));
+        }
+    }
+
+    @Override
+    public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
+        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(jobId)) {
+            jobEventInbox.add(new ActiveEvent(jobId, ActiveEvent.EventKind.JOB_FINISH));
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, acggf.getJobSpecification());
+    }
+
+    public void receive(ActivePartitionMessage message) {
+        if (ActiveJobNotificationHandler.INSTANCE.isActiveJob(message.getJobId())) {
+            jobEventInbox.add(new ActiveEvent(message.getJobId(), ActiveEvent.EventKind.PARTITION_EVENT,
+                    message.getFeedId(), message.getPayload()));
+        }
+    }
+
+    public void stop() {
+        executorService.shutdown();
+    }
+}


Mime
View raw message