Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFC55200D22 for ; Sat, 21 Oct 2017 18:10:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE3EF160BEF; Sat, 21 Oct 2017 16:10:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8D3CE1609DF for ; Sat, 21 Oct 2017 18:10:41 +0200 (CEST) Received: (qmail 18248 invoked by uid 500); 21 Oct 2017 16:10:40 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 18239 invoked by uid 99); 21 Oct 2017 16:10:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Oct 2017 16:10:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1672DFAE3; Sat, 21 Oct 2017 16:10:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xikui@apache.org To: commits@asterixdb.apache.org Date: Sat, 21 Oct 2017 16:10:40 -0000 Message-Id: <1bb9cad5976b4afda747d50ef6a59ea5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] asterixdb git commit: [ASTERIXDB-1983] Feed pipeline refactoring for SQL++ archived-at: Sat, 21 Oct 2017 16:10:42 -0000 Repository: asterixdb Updated Branches: refs/heads/master 799046dfa -> 7e76a0797 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 692ba2e..a418cbf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -299,8 +299,8 @@ public class ExternalDataUtils { return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED)); } - public static boolean isUpsertFeed(Map configuration) { - return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_UPSERT_FEED)); + public static boolean isInsertFeed(Map configuration) { + return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_INSERT_FEED)); } public static int getNumberOfKeys(Map configuration) throws AsterixException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java index 9538711..f42c030 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java @@ -18,8 +18,14 @@ */ package org.apache.asterix.external.util; +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.common.functions.FunctionSignature; + public class FeedConstants { + public static final FunctionSignature FEED_COLLECT_FUN_SIGNATURE = + new FunctionSignature(FunctionConstants.ASTERIX_NS, "feed_collect", 6); + public final static String FEEDS_METADATA_DV = "feeds_metadata"; public final static String FAILED_TUPLE_DATASET = "failed_tuple"; public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType"; @@ -31,7 +37,6 @@ public class FeedConstants { public static final String INTAKE_TIMESTAMP = "intake-timestamp"; public static final String COMPUTE_TIMESTAMP = "compute-timestamp"; public static final String STORE_TIMESTAMP = "store-timestamp"; - } public static final class MessageConstants { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/pom.xml b/asterixdb/asterix-lang-aql/pom.xml index dd29e7c..8ed16ad 100644 --- a/asterixdb/asterix-lang-aql/pom.xml +++ b/asterixdb/asterix-lang-aql/pom.xml @@ -147,21 +147,11 @@ org.apache.asterix - asterix-active - ${project.version} - - - org.apache.asterix asterix-metadata ${project.version} org.apache.asterix - asterix-external-data - ${project.version} - - - org.apache.asterix asterix-common ${project.version} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java index 47a9580..098b447 100644 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java +++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Statement; @@ -40,6 +41,10 @@ public class FunctionParser { } public FunctionDecl getFunctionDecl(Function function) throws CompilationException { + if (!function.getLanguage().equals(Function.LANGUAGE_AQL)) { + throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE, + Function.LANGUAGE_AQL, function.getLanguage()); + } String functionBody = function.getFunctionBody(); List params = function.getParams(); List varIdentifiers = new ArrayList(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java deleted file mode 100644 index f0539c6..0000000 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.aql.statement; - -import java.io.StringReader; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.EntityId; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.MetadataException; -import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.external.feed.management.FeedConnectionRequest; -import org.apache.asterix.external.feed.watch.FeedActivityDetails; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.lang.aql.parser.AQLParserFactory; -import org.apache.asterix.lang.common.base.IParser; -import org.apache.asterix.lang.common.base.IParserFactory; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.statement.InsertStatement; -import org.apache.asterix.lang.common.statement.Query; -import org.apache.asterix.lang.common.util.FunctionUtil; -import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.entities.Feed; -import org.apache.asterix.metadata.entities.Function; -import org.apache.asterix.metadata.feeds.FeedMetadataUtil; - -/** - * Represents the AQL statement for subscribing to a feed. - * This AQL statement is private and may not be used by the end-user. - */ -public class SubscribeFeedStatement implements Statement { - - public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; - private static final Integer INSERT_STATEMENT_POS = 3; - private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName()); - private final int varCounter; - private final String[] locations; - private final FeedConnectionRequest connectionRequest; - private final IParserFactory parserFactory = new AQLParserFactory(); - private Query query; - - public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) { - this.connectionRequest = subscriptionRequest; - this.varCounter = 0; - this.locations = locations; - } - - public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException { - this.query = new Query(false); - EntityId sourceFeedId = connectionRequest.getReceivingFeedId(); - Feed subscriberFeed = - MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(), - connectionRequest.getReceivingFeedId().getEntityName()); - if (subscriberFeed == null) { - throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found."); - } - - String feedOutputType = getOutputType(mdTxnCtx); - StringBuilder builder = new StringBuilder(); - builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n"); - builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n"); - builder.append("set" + " " + FeedActivityDetails.FEED_POLICY_NAME + " " + "'" + connectionRequest.getPolicy() - + "'" + ";\n"); - - builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " "); - builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'" - + sourceFeedId.getEntityName() + "'" + "," + "'" - + connectionRequest.getReceivingFeedId().getEntityName() + "'" + "," + "'" - + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'" - + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")"); - - List functionsToApply = connectionRequest.getFunctionsToApply(); - if ((functionsToApply != null) && functionsToApply.isEmpty()) { - builder.append(" return $x"); - } else { - Function function; - String rValueName = "x"; - String lValueName = "y"; - int variableIndex = 0; - for (FunctionSignature appliedFunction : functionsToApply) { - function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); - variableIndex++; - switch (function.getLanguage().toUpperCase()) { - case Function.LANGUAGE_AQL: - builder.append(" let " + "$" + lValueName + variableIndex + ":=" + function.getName() + "(" - + "$" + rValueName + ")"); - rValueName = lValueName + variableIndex; - break; - case Function.LANGUAGE_JAVA: - builder.append(" let " + "$" + lValueName + variableIndex + ":=" + function.getName() + "(" - + "$" + rValueName + ")"); - rValueName = lValueName + variableIndex; - break; - } - builder.append("\n"); - } - builder.append("return $" + lValueName + variableIndex); - } - builder.append(")"); - builder.append(";"); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connect feed statement translated to\n" + builder.toString()); - } - IParser parser = parserFactory.createParser(new StringReader(builder.toString())); - - List statements; - try { - statements = parser.parse(); - query = ((InsertStatement) statements.get(INSERT_STATEMENT_POS)).getQuery(); - } catch (CompilationException pe) { - throw new MetadataException(pe); - } - - } - - public Query getQuery() { - return query; - } - - public int getVarCounter() { - return varCounter; - } - - @Override - public byte getKind() { - return Statement.Kind.SUBSCRIBE_FEED; - } - - public String getPolicy() { - return connectionRequest.getPolicy(); - } - - public FeedConnectionRequest getSubscriptionRequest() { - return connectionRequest; - } - - @Override - public R accept(ILangVisitor visitor, T arg) throws CompilationException { - return null; - } - - public String getDataverseName() { - return connectionRequest.getReceivingFeedId().getDataverse(); - } - - private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException { - String outputType; - EntityId feedId = connectionRequest.getReceivingFeedId(); - Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()); - try { - outputType = FeedMetadataUtil - .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME) - .getTypeName(); - return outputType; - - } catch (MetadataException ae) { - throw new MetadataException(ae); - } - } - - public String[] getLocations() { - return locations; - } - - @Override - public byte getCategory() { - return Category.PROCEDURE; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java index 351ac70..8d8ab73 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java @@ -123,7 +123,8 @@ public class FunctionUtil { throw new CompilationException(messageBuilder.toString()); } - if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) { + if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL) + || function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_SQLPP)) { FunctionDecl functionDecl = functionParser.getFunctionDecl(function); if (functionDecl != null) { if (functionDecls.contains(functionDecl)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java index 2b24ea1..4c0aa24 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java @@ -23,6 +23,7 @@ import java.io.StringReader; import java.util.List; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Statement; @@ -40,6 +41,10 @@ public class FunctionParser { } public FunctionDecl getFunctionDecl(Function function) throws CompilationException { + if (!function.getLanguage().equals(Function.LANGUAGE_SQLPP)) { + throw new CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE, + Function.LANGUAGE_SQLPP, function.getLanguage()); + } String functionBody = function.getFunctionBody(); List params = function.getParams(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index fa60bba..9156b0f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -63,6 +63,7 @@ import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.metadata.utils.InvertedIndexResourceFactoryProvider; import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider; +import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.utils.RecordUtil; @@ -85,6 +86,10 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; @@ -289,8 +294,31 @@ public class Dataset implements IMetadataEntity, IDataset { && Objects.equals(datasetName, otherDataset.datasetName); } - public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended - return !hasMetaPart(); + public boolean allow(ILogicalOperator topOp, byte operation) { + if (!hasMetaPart()) { + return true; + } + if (topOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.ASSIGN) { + return false; + } + ILogicalOperator op = topOp.getInputs().get(0).getValue(); + while ((!op.getInputs().isEmpty()) + && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.UNNEST) { + op = op.getInputs().get(0).getValue(); + } + if (op.getInputs().isEmpty()) { + return false; + } + UnnestOperator unnestOp = (UnnestOperator) op.getInputs().get(0).getValue(); + if (unnestOp.getExpressionRef().getValue().getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + return false; + } + AbstractFunctionCallExpression functionCall = + (AbstractFunctionCallExpression) unnestOp.getExpressionRef().getValue(); + if (functionCall.getFunctionIdentifier() != BuiltinFunctions.FEED_COLLECT) { + return false; + } + return operation == DatasetUtil.OP_UPSERT; } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7e76a079/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java index 7ff423c..1d1db37 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java @@ -26,6 +26,7 @@ import org.apache.asterix.metadata.api.IMetadataEntity; public class Function implements IMetadataEntity { private static final long serialVersionUID = 1L; public static final String LANGUAGE_AQL = "AQL"; + public static final String LANGUAGE_SQLPP = "SQLPP"; public static final String LANGUAGE_JAVA = "JAVA"; public static final String RETURNTYPE_VOID = "VOID";