asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xi...@apache.org
Subject [1/3] asterixdb git commit: [ASTERIXDB-1983] Feed pipeline refactoring for SQL++
Date Sat, 21 Oct 2017 16:10:40 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 799046dfa -> 7e76a0797


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 692ba2e..a418cbf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -299,8 +299,8 @@ public class ExternalDataUtils {
         return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
     }
 
-    public static boolean isUpsertFeed(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_UPSERT_FEED));
+    public static boolean isInsertFeed(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_INSERT_FEED));
     }
 
     public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException
{

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index 9538711..f42c030 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -18,8 +18,14 @@
  */
 package org.apache.asterix.external.util;
 
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+
 public class FeedConstants {
 
+    public static final FunctionSignature FEED_COLLECT_FUN_SIGNATURE =
+            new FunctionSignature(FunctionConstants.ASTERIX_NS, "feed_collect", 6);
+
     public final static String FEEDS_METADATA_DV = "feeds_metadata";
     public final static String FAILED_TUPLE_DATASET = "failed_tuple";
     public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
@@ -31,7 +37,6 @@ public class FeedConstants {
         public static final String INTAKE_TIMESTAMP = "intake-timestamp";
         public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
         public static final String STORE_TIMESTAMP = "store-timestamp";
-
     }
 
     public static final class MessageConstants {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/pom.xml b/asterixdb/asterix-lang-aql/pom.xml
index dd29e7c..8ed16ad 100644
--- a/asterixdb/asterix-lang-aql/pom.xml
+++ b/asterixdb/asterix-lang-aql/pom.xml
@@ -147,21 +147,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-active</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-metadata</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-external-data</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-common</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
index 47a9580..098b447 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@ public class FunctionParser {
     }
 
     public FunctionDecl getFunctionDecl(Function function) throws CompilationException {
+        if (!function.getLanguage().equals(Function.LANGUAGE_AQL)) {
+            throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+                    Function.LANGUAGE_AQL, function.getLanguage());
+        }
         String functionBody = function.getFunctionBody();
         List<String> params = function.getParams();
         List<VarIdentifier> varIdentifiers = new ArrayList<VarIdentifier>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
deleted file mode 100644
index f0539c6..0000000
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.aql.statement;
-
-import java.io.StringReader;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.lang.aql.parser.AQLParserFactory;
-import org.apache.asterix.lang.common.base.IParser;
-import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.InsertStatement;
-import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-
-/**
- * Represents the AQL statement for subscribing to a feed.
- * This AQL statement is private and may not be used by the end-user.
- */
-public class SubscribeFeedStatement implements Statement {
-
-    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
-    private static final Integer INSERT_STATEMENT_POS = 3;
-    private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
-    private final int varCounter;
-    private final String[] locations;
-    private final FeedConnectionRequest connectionRequest;
-    private final IParserFactory parserFactory = new AQLParserFactory();
-    private Query query;
-
-    public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest)
{
-        this.connectionRequest = subscriptionRequest;
-        this.varCounter = 0;
-        this.locations = locations;
-    }
-
-    public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException
{
-        this.query = new Query(false);
-        EntityId sourceFeedId = connectionRequest.getReceivingFeedId();
-        Feed subscriberFeed =
-                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
-                        connectionRequest.getReceivingFeedId().getEntityName());
-        if (subscriberFeed == null) {
-            throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not
found.");
-        }
-
-        String feedOutputType = getOutputType(mdTxnCtx);
-        StringBuilder builder = new StringBuilder();
-        builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
-        builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" +
Boolean.TRUE + "'" + ";\n");
-        builder.append("set" + " " + FeedActivityDetails.FEED_POLICY_NAME + " " + "'" + connectionRequest.getPolicy()
-                + "'" + ";\n");
-
-        builder.append("insert into dataset " + connectionRequest.getTargetDataset() + "
");
-        builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse()
+ "'" + "," + "'"
-                + sourceFeedId.getEntityName() + "'" + "," + "'"
-                + connectionRequest.getReceivingFeedId().getEntityName() + "'" + "," + "'"
-                + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
-                + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType
+ "'" + ")");
-
-        List<FunctionSignature> functionsToApply = connectionRequest.getFunctionsToApply();
-        if ((functionsToApply != null) && functionsToApply.isEmpty()) {
-            builder.append(" return $x");
-        } else {
-            Function function;
-            String rValueName = "x";
-            String lValueName = "y";
-            int variableIndex = 0;
-            for (FunctionSignature appliedFunction : functionsToApply) {
-                function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
-                variableIndex++;
-                switch (function.getLanguage().toUpperCase()) {
-                    case Function.LANGUAGE_AQL:
-                        builder.append(" let " + "$" + lValueName + variableIndex + ":="
+ function.getName() + "("
-                                + "$" + rValueName + ")");
-                        rValueName = lValueName + variableIndex;
-                        break;
-                    case Function.LANGUAGE_JAVA:
-                        builder.append(" let " + "$" + lValueName + variableIndex + ":="
+ function.getName() + "("
-                                + "$" + rValueName + ")");
-                        rValueName = lValueName + variableIndex;
-                        break;
-                }
-                builder.append("\n");
-            }
-            builder.append("return $" + lValueName + variableIndex);
-        }
-        builder.append(")");
-        builder.append(";");
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Connect feed statement translated to\n" + builder.toString());
-        }
-        IParser parser = parserFactory.createParser(new StringReader(builder.toString()));
-
-        List<Statement> statements;
-        try {
-            statements = parser.parse();
-            query = ((InsertStatement) statements.get(INSERT_STATEMENT_POS)).getQuery();
-        } catch (CompilationException pe) {
-            throw new MetadataException(pe);
-        }
-
-    }
-
-    public Query getQuery() {
-        return query;
-    }
-
-    public int getVarCounter() {
-        return varCounter;
-    }
-
-    @Override
-    public byte getKind() {
-        return Statement.Kind.SUBSCRIBE_FEED;
-    }
-
-    public String getPolicy() {
-        return connectionRequest.getPolicy();
-    }
-
-    public FeedConnectionRequest getSubscriptionRequest() {
-        return connectionRequest;
-    }
-
-    @Override
-    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException
{
-        return null;
-    }
-
-    public String getDataverseName() {
-        return connectionRequest.getReceivingFeedId().getDataverse();
-    }
-
-    private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException
{
-        String outputType;
-        EntityId feedId = connectionRequest.getReceivingFeedId();
-        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
-        try {
-            outputType = FeedMetadataUtil
-                    .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME)
-                    .getTypeName();
-            return outputType;
-
-        } catch (MetadataException ae) {
-            throw new MetadataException(ae);
-        }
-    }
-
-    public String[] getLocations() {
-        return locations;
-    }
-
-    @Override
-    public byte getCategory() {
-        return Category.PROCEDURE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index 351ac70..8d8ab73 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -123,7 +123,8 @@ public class FunctionUtil {
                 throw new CompilationException(messageBuilder.toString());
             }
 
-            if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
+            if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)
+                    || function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_SQLPP))
{
                 FunctionDecl functionDecl = functionParser.getFunctionDecl(function);
                 if (functionDecl != null) {
                     if (functionDecls.contains(functionDecl)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
index 2b24ea1..4c0aa24 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
@@ -23,6 +23,7 @@ import java.io.StringReader;
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@ public class FunctionParser {
     }
 
     public FunctionDecl getFunctionDecl(Function function) throws CompilationException {
+        if (!function.getLanguage().equals(Function.LANGUAGE_SQLPP)) {
+            throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+                    Function.LANGUAGE_SQLPP, function.getLanguage());
+        }
         String functionBody = function.getFunctionBody();
         List<String> params = function.getParams();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index fa60bba..9156b0f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -63,6 +63,7 @@ import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.InvertedIndexResourceFactoryProvider;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.RecordUtil;
@@ -85,6 +86,10 @@ import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
@@ -289,8 +294,31 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset
{
                 && Objects.equals(datasetName, otherDataset.datasetName);
     }
 
-    public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method
is meant to be extended
-        return !hasMetaPart();
+    public boolean allow(ILogicalOperator topOp, byte operation) {
+        if (!hasMetaPart()) {
+            return true;
+        }
+        if (topOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.ASSIGN)
{
+            return false;
+        }
+        ILogicalOperator op = topOp.getInputs().get(0).getValue();
+        while ((!op.getInputs().isEmpty())
+                && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.UNNEST)
{
+            op = op.getInputs().get(0).getValue();
+        }
+        if (op.getInputs().isEmpty()) {
+            return false;
+        }
+        UnnestOperator unnestOp = (UnnestOperator) op.getInputs().get(0).getValue();
+        if (unnestOp.getExpressionRef().getValue().getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL)
{
+            return false;
+        }
+        AbstractFunctionCallExpression functionCall =
+                (AbstractFunctionCallExpression) unnestOp.getExpressionRef().getValue();
+        if (functionCall.getFunctionIdentifier() != BuiltinFunctions.FEED_COLLECT) {
+            return false;
+        }
+        return operation == DatasetUtil.OP_UPSERT;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 7ff423c..1d1db37 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -26,6 +26,7 @@ import org.apache.asterix.metadata.api.IMetadataEntity;
 public class Function implements IMetadataEntity<Function> {
     private static final long serialVersionUID = 1L;
     public static final String LANGUAGE_AQL = "AQL";
+    public static final String LANGUAGE_SQLPP = "SQLPP";
     public static final String LANGUAGE_JAVA = "JAVA";
 
     public static final String RETURNTYPE_VOID = "VOID";


Mime
View raw message