Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0849200B8C for ; Mon, 12 Sep 2016 23:13:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BEBDA160AB2; Mon, 12 Sep 2016 21:13:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6EA07160AB8 for ; Mon, 12 Sep 2016 23:13:49 +0200 (CEST) Received: (qmail 73904 invoked by uid 500); 12 Sep 2016 21:13:48 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 73888 invoked by uid 99); 12 Sep 2016 21:13:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 21:13:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4635EE0551; Mon, 12 Sep 2016 21:13:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Mon, 12 Sep 2016 21:13:49 -0000 Message-Id: <1b068c8898204f50a4c13410b2e28e20@git.apache.org> In-Reply-To: <93f6ceca492342b38f5d0cbcdb0e9e81@git.apache.org> References: <93f6ceca492342b38f5d0cbcdb0e9e81@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] asterixdb git commit: ASTERIXDB-1451: Remove Record Casting for insert/delete/upsert archived-at: Mon, 12 Sep 2016 21:13:51 -0000 ASTERIXDB-1451: Remove Record Casting for insert/delete/upsert This change includes the following: - Introduce cast function in case of delete operation after the primary index to ensure types are passed correctly to enforced indexes. - Introduce cast function in case of upsert operation before old secondary keys extraction to ensure types are passed correctly to enforced indexes. - Replace all record casts with open field casts for insert/delete/upsert operations. - Sonar-Qube fixes. Change-Id: I6a80105798ea1c86a6a0eb69a79b9573b54931b7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1146 Tested-by: Jenkins Integration-Tests: Jenkins Reviewed-by: abdullah alamoudi Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/534d5892 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/534d5892 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/534d5892 Branch: refs/heads/master Commit: 534d589297cc4fadbd61b4de70af5c406299814c Parents: 11a0294 Author: Abdullah Alamoudi Authored: Mon Sep 12 21:12:36 2016 +0300 Committer: abdullah alamoudi Committed: Mon Sep 12 14:13:03 2016 -0700 ---------------------------------------------------------------------- ...IntroduceSecondaryIndexInsertDeleteRule.java | 801 +++++++++---------- .../rules/ReplaceSinkOpWithCommitOpRule.java | 2 +- .../rules/typecast/StaticTypeCastUtil.java | 3 +- .../asterix/app/translator/QueryTranslator.java | 112 ++- .../results/disjunction-to-join-delete-1.plan | 29 +- .../results/disjunction-to-join-delete-2.plan | 41 +- .../results/disjunction-to-join-delete-3.plan | 39 +- .../insert-and-scan-dataset-with-index.plan | 25 +- .../scan-delete-rtree-secondary-index.plan | 25 +- .../results/scan-insert-secondary-index.plan | 20 +- .../skip-ngram-index-search-in-delete.plan | 25 +- .../skip-rtree-index-search-in-delete.plan | 25 +- ...-secondary-btree-index-search-in-delete.plan | 25 +- .../skip-word-index-search-in-delete.plan | 25 +- .../enforced-type-delete.1.ddl.aql | 33 + .../enforced-type-delete.2.update.aql | 29 + .../enforced-type-delete.3.ddl.aql | 27 + .../enforced-type-delete.4.update.aql | 32 + .../enforced-type-delete.5.query.aql | 29 + .../enforced-type-upsert.1.ddl.aql | 33 + .../enforced-type-upsert.2.update.aql | 29 + .../enforced-type-upsert.3.ddl.aql | 27 + .../enforced-type-upsert.4.update.aql | 29 + .../enforced-type-upsert.5.query.aql | 29 + .../enforced-type-delete.5.adm | 1 + .../enforced-type-upsert.5.adm | 1 + .../src/test/resources/runtimets/testsuite.xml | 12 + .../apache/asterix/metadata/entities/Index.java | 2 + .../om/typecomputer/base/TypeCastUtils.java | 18 +- .../om/typecomputer/impl/CastTypeComputer.java | 16 +- .../IndexInsertDeleteUpsertOperator.java | 5 +- .../logical/InsertDeleteUpsertOperator.java | 6 +- .../logical/visitors/UsedVariableVisitor.java | 7 +- .../LogicalOperatorPrettyPrintVisitor.java | 12 +- 34 files changed, 952 insertions(+), 622 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/534d5892/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java index c64258f..c487a96 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java @@ -20,10 +20,9 @@ package org.apache.asterix.optimizer.rules; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; -import java.util.Stack; +import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -32,25 +31,22 @@ import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.metadata.declared.AqlDataSource; import org.apache.asterix.metadata.declared.AqlIndex; import org.apache.asterix.metadata.declared.AqlMetadataProvider; -import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.om.base.AInt32; import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.base.IAObject; import org.apache.asterix.om.constants.AsterixConstantValue; import org.apache.asterix.om.functions.AsterixBuiltinFunctions; import org.apache.asterix.om.typecomputer.base.TypeCastUtils; import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.types.hierachy.ATypeHierarchy; import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -77,6 +73,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOpe import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +/** + * This rule matches the pattern: + * assign --> insert-delete-upsert --> sink + * and produces + * assign --> insert-delete-upsert --> *(secondary indexes index-insert-delete-upsert) --> sink + */ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewriteRule { @Override @@ -88,71 +90,38 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit @Override public boolean rewritePost(Mutable opRef, IOptimizationContext context) throws AlgebricksException { - AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue(); - if (op0.getOperatorTag() != LogicalOperatorTag.SINK) { + AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) opRef.getValue(); + if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) { return false; } - AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue(); - if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) { + if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) { return false; } - - FunctionIdentifier fid = null; /** find the record variable */ - InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) op1; - boolean isBulkload = insertOp.isBulkload(); - ILogicalExpression recordExpr = insertOp.getPayloadExpression().getValue(); - List> metaExprs = insertOp.getAdditionalNonFilteringExpressions(); - LogicalVariable recordVar = null; - LogicalVariable metaVar = null; - List usedRecordVars = new ArrayList<>(); - /** assume the payload is always a single variable expression */ - recordExpr.getUsedVariables(usedRecordVars); - if (usedRecordVars.size() == 1) { - recordVar = usedRecordVars.get(0); - } - if (metaExprs != null) { - List metaVars = new ArrayList<>(); - for (Mutable expr : metaExprs) { - expr.getValue().getUsedVariables(metaVars); - } - if (metaVars.size() > 1) { - throw new AlgebricksException( - "Number of meta fields can't be more than 1. Number of meta fields found = " + metaVars.size()); - } - metaVar = metaVars.get(0); - } + InsertDeleteUpsertOperator primaryIndexModificationOp = + (InsertDeleteUpsertOperator) sinkOp.getInputs().get(0).getValue(); + boolean isBulkload = primaryIndexModificationOp.isBulkload(); + ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue(); + List> newMetaExprs = + primaryIndexModificationOp.getAdditionalNonFilteringExpressions(); + LogicalVariable newRecordVar; + LogicalVariable newMetaVar = null; /** - * op2 is the assign operator which extracts primary keys from the input + * inputOp is the assign operator which extracts primary keys from the input * variables (record or meta) */ - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); + AbstractLogicalOperator inputOp = + (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue(); - if (recordVar == null) { - /** - * For the case primary key-assignment expressions are constant - * expressions, find assign op that creates record to be - * inserted/deleted. - */ - while (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) { - if (op2.getInputs().size() == 0) { - return false; - } - op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue(); - if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) { - continue; - } - AssignOperator assignOp = (AssignOperator) op2; - ILogicalExpression assignExpr = assignOp.getExpressions().get(0).getValue(); - if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { - ScalarFunctionCallExpression funcExpr = - (ScalarFunctionCallExpression) assignOp.getExpressions().get(0).getValue(); - fid = funcExpr.getFunctionIdentifier(); - } + newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0); + if (newMetaExprs != null && !newMetaExprs.isEmpty()) { + if (newMetaExprs.size() > 1) { + throw new AlgebricksException( + "Number of meta records can't be more than 1. Number of meta records found = " + + newMetaExprs.size()); } - AssignOperator assignOp2 = (AssignOperator) op2; - recordVar = assignOp2.getVariables().get(0); + newMetaVar = getRecordVar(context, inputOp, newMetaExprs.get(0).getValue(), 1); } /* @@ -160,10 +129,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit * Note: We have two operators: * 1. An InsertDeleteOperator (primary) * 2. An IndexInsertDeleteOperator (secondary) - * The current insertOp is of the first type + * The current primaryIndexModificationOp is of the first type */ - AqlDataSource datasetSource = (AqlDataSource) insertOp.getDataSource(); + AqlDataSource datasetSource = (AqlDataSource) primaryIndexModificationOp.getDataSource(); AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider(); String dataverseName = datasetSource.getId().getDataverseName(); String datasetName = datasetSource.getId().getDatasourceName(); @@ -175,7 +144,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit return false; } - // Create operators for secondary index insert/delete. + // Create operators for secondary index insert / delete. String itemTypeName = dataset.getItemTypeName(); IAType itemType = mp.findType(dataset.getItemTypeDataverseName(), itemTypeName); if (itemType.getTypeTag() != ATypeTag.RECORD) { @@ -187,49 +156,35 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit if (dataset.hasMetaPart()) { metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); } - // recType might be replaced with enforced record type and we want to keep a reference to the original record - // type - ARecordType originalRecType = recType; List indexes = mp.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()); // Set the top operator pointer to the primary IndexInsertDeleteOperator - ILogicalOperator currentTop = op1; + ILogicalOperator currentTop = primaryIndexModificationOp; boolean hasSecondaryIndex = false; // Put an n-gram or a keyword index in the later stage of index-update, // since TokenizeOperator needs to be involved. - Collections.sort(indexes, new Comparator() { - @Override - public int compare(Index o1, Index o2) { - return o1.getIndexType().ordinal() - o2.getIndexType().ordinal(); - } - - }); - - // Check whether multiple indexes exist - int secondaryIndexTotalCnt = 0; - for (Index index : indexes) { - if (index.isSecondaryIndex()) { - secondaryIndexTotalCnt++; - } - } + Collections.sort(indexes, (o1, o2) -> o1.getIndexType().ordinal() - o2.getIndexType().ordinal()); // At this point, we have the data type info, and the indexes info as well + int secondaryIndexTotalCnt = indexes.size() - 1; if (secondaryIndexTotalCnt > 0) { - op0.getInputs().clear(); + sinkOp.getInputs().clear(); + } else { + return false; } // Initialize inputs to the SINK operator Op0 (The SINK) is now without input - // Prepare filtering field information (This is the filter created using the "filter with" key word in the // create dataset ddl) List filteringFields = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField(); - List filteringVars = null; + List filteringVars; List> filteringExpressions = null; if (filteringFields != null) { // The filter field var already exists. we can simply get it from the insert op - filteringVars = new ArrayList(); - filteringExpressions = new ArrayList>(); - for (Mutable filteringExpression : insertOp.getAdditionalFilteringExpressions()) { + filteringVars = new ArrayList<>(); + filteringExpressions = new ArrayList<>(); + for (Mutable filteringExpression : primaryIndexModificationOp + .getAdditionalFilteringExpressions()) { filteringExpression.getValue().getUsedVariables(filteringVars); for (LogicalVariable var : filteringVars) { filteringExpressions @@ -237,51 +192,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit } } } - LogicalVariable enforcedRecordVar = recordVar; - - /* - * if the index is enforcing field types (For open indexes), We add a cast - * operator to ensure type safety - */ - if (insertOp.getOperation() == Kind.INSERT || insertOp.getOperation() == Kind.UPSERT) { - try { - DatasetDataSource ds = (DatasetDataSource) (insertOp.getDataSource()); - ARecordType insertRecType = (ARecordType) ds.getItemType(); - // create the expected record type = the original + the optional open field - ARecordType enforcedType = createEnforcedType(insertRecType, indexes); - if (!enforcedType.equals(insertRecType)) { - // A new variable which represents the casted record - LogicalVariable castedRecVar = context.newVar(); - context.addNotToBeInlinedVar(castedRecVar); - //introduce casting to enforced type - AbstractFunctionCallExpression castFunc = new ScalarFunctionCallExpression( - FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE)); - // The first argument is the record - castFunc.getArguments() - .add(new MutableObject(insertOp.getPayloadExpression().getValue())); - TypeCastUtils.setRequiredAndInputTypes(castFunc, enforcedType, insertRecType); - // AssignOperator puts in the cast var the casted record - AssignOperator castedRecordAssignOperator = - new AssignOperator(castedRecVar, new MutableObject(castFunc)); - // Connect the current top of the plan to the cast operator - castedRecordAssignOperator.getInputs().addAll(currentTop.getInputs()); - currentTop.getInputs().clear(); - currentTop.getInputs().add(new MutableObject<>(castedRecordAssignOperator)); - enforcedRecordVar = castedRecVar; - recType = enforcedType; - context.computeAndSetTypeEnvironmentForOperator(castedRecordAssignOperator); - context.computeAndSetTypeEnvironmentForOperator(currentTop); - // We don't need to cast the old rec, we just need an assignment function that extracts the SK - // and an expression which reference the new variables. - } - } catch (AsterixException e) { - throw new AlgebricksException(e); - } - } // Replicate Operator is applied only when doing the bulk-load. - AbstractLogicalOperator replicateOp = null; - if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload()) { + ReplicateOperator replicateOp = null; + if (secondaryIndexTotalCnt > 1 && primaryIndexModificationOp.isBulkload()) { // Split the logical plan into "each secondary index update branch" // to replicate each pair. replicateOp = new ReplicateOperator(secondaryIndexTotalCnt); @@ -291,6 +205,50 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit currentTop = replicateOp; } + /* + * The two maps are used to store variables to which [casted] field access is assigned. + * One for the beforeOp record and the other for the new record. + * There are two uses for these maps: + * 1. used for shared fields in indexes with overlapping keys. + * 2. used for setting variables of secondary keys for each secondary index operator. + */ + Map fieldVarsForBeforeOperation = new HashMap<>(); + Map fieldVarsForNewRecord = new HashMap<>(); + /* + * if the index is enforcing field types (For open indexes), We add a cast + * operator to ensure type safety + */ + try { + if (primaryIndexModificationOp.getOperation() == Kind.INSERT + || primaryIndexModificationOp.getOperation() == Kind.UPSERT + /* Actually, delete should not be here but it is now until issue + * https://issues.apache.org/jira/browse/ASTERIXDB-1507 + * is solved + */ + || primaryIndexModificationOp.getOperation() == Kind.DELETE) { + injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType, + metaType, newRecordVar, newMetaVar, primaryIndexModificationOp, false); + if (replicateOp != null) { + context.computeAndSetTypeEnvironmentForOperator(replicateOp); + } + } + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT + /* Actually, delete should be here but it is not until issue + * https://issues.apache.org/jira/browse/ASTERIXDB-1507 + * is solved + */) { + List beforeOpMetaVars = + primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars(); + LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0); + currentTop = + injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation, recType, + metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, + currentTop, true); + } + } catch (AsterixException e) { + throw new AlgebricksException(e); + } + // Iterate each secondary index and applying Index Update operations. // At first, op1 is the index insert op insertOp for (Index index : indexes) { @@ -300,88 +258,36 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit hasSecondaryIndex = true; // Get the secondary fields names and types List> secondaryKeyFields = index.getKeyFieldNames(); - List secondaryKeyTypes = index.getKeyFieldTypes(); - List secondaryKeyVars = new ArrayList(); - List indicators = index.getKeyFieldSourceIndicators(); - List> expressions = new ArrayList>(); - List> secondaryExpressions = new ArrayList>(); + List secondaryKeyVars = new ArrayList<>(); + List> secondaryExpressions = new ArrayList<>(); + List> beforeOpSecondaryExpressions = new ArrayList<>(); + ILogicalOperator replicateOutput; for (int i = 0; i < secondaryKeyFields.size(); i++) { - List secondaryKey = secondaryKeyFields.get(i); - ARecordType sourceType = recType; - LogicalVariable sourceVar = enforcedRecordVar; - if (dataset.hasMetaPart()) { - sourceType = indicators.get(i).intValue() == 0 ? recType : metaType; - sourceVar = indicators.get(i).intValue() == 0 ? enforcedRecordVar : metaVar; + IndexFieldId indexFieldId = + new IndexFieldId(index.getKeyFieldSourceIndicators().get(i), secondaryKeyFields.get(i)); + LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId); + secondaryKeyVars.add(skVar); + secondaryExpressions.add(new MutableObject( + new VariableReferenceExpression(skVar))); + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) { + beforeOpSecondaryExpressions.add(new MutableObject( + new VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId)))); } - prepareVarAndExpression(secondaryKey, sourceType.getFieldNames(), sourceVar, expressions, - secondaryKeyVars, context); - } - // Used with upsert operation - // in case of upsert, we need vars and expressions for the old SK as well. - List prevSecondaryKeyVars = null; - List> prevExpressions = null; - List> prevSecondaryExpressions = null; - AssignOperator prevSecondaryKeyAssign = null; - if (insertOp.getOperation() == Kind.UPSERT) { - prevSecondaryKeyVars = new ArrayList(); - prevExpressions = new ArrayList>(); - prevSecondaryExpressions = new ArrayList>(); - for (int i = 0; i < secondaryKeyFields.size(); i++) { - List secondaryKey = secondaryKeyFields.get(i); - prepareVarAndExpression(secondaryKey, - (indicators.get(i).intValue() == 0) ? originalRecType.getFieldNames() - : metaType.getFieldNames(), - (indicators.get(i).intValue() == 0) ? insertOp.getPrevRecordVar() - : insertOp.getPrevAdditionalNonFilteringVars().get(0), - prevExpressions, prevSecondaryKeyVars, context); - } - prevSecondaryKeyAssign = new AssignOperator(prevSecondaryKeyVars, prevExpressions); - } - AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions); - AssignOperator topAssign = assign; - if (insertOp.getOperation() == Kind.UPSERT) { - prevSecondaryKeyAssign.getInputs().add(new MutableObject(topAssign)); - topAssign = prevSecondaryKeyAssign; - } - // Only apply replicate operator when doing bulk-load - if (secondaryIndexTotalCnt > 1 && insertOp.isBulkload()) { - assign.getInputs().add(new MutableObject(replicateOp)); - } else { - assign.getInputs().add(new MutableObject(currentTop)); } - context.computeAndSetTypeEnvironmentForOperator(assign); - if (insertOp.getOperation() == Kind.UPSERT) { - context.computeAndSetTypeEnvironmentForOperator(prevSecondaryKeyAssign); - } - currentTop = topAssign; - - // in case of an Upsert operation, the currentTop is an assign which has the old secondary keys + the new secondary keys - if (index.getIndexType() == IndexType.BTREE || index.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX - || index.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX - || index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { + IndexInsertDeleteUpsertOperator indexUpdate; + if (index.getIndexType() != IndexType.RTREE) { // Create an expression per key - for (LogicalVariable secondaryKeyVar : secondaryKeyVars) { - secondaryExpressions.add( - new MutableObject(new VariableReferenceExpression(secondaryKeyVar))); - } - Mutable filterExpression = null; - if (insertOp.getOperation() == Kind.UPSERT) { - for (LogicalVariable oldSecondaryKeyVar : prevSecondaryKeyVars) { - prevSecondaryExpressions.add(new MutableObject( - new VariableReferenceExpression(oldSecondaryKeyVar))); - } - } else { - filterExpression = createFilterExpression(secondaryKeyVars, - context.getOutputTypeEnvironment(currentTop), false); - } + Mutable filterExpression = + (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null + : createFilterExpression(secondaryKeyVars, + context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds()); AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp); // Introduce the TokenizeOperator only when doing bulk-load, // and index type is keyword or n-gram. - if (index.getIndexType() != IndexType.BTREE && insertOp.isBulkload()) { + if (index.getIndexType() != IndexType.BTREE && primaryIndexModificationOp.isBulkload()) { // Note: Bulk load case, we don't need to take care of it for upsert operation // Check whether the index is length-partitioned or not. // If partitioned, [input variables to TokenizeOperator, @@ -391,27 +297,25 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit // and fed into the IndexInsertDeleteOperator. // Input variables are passed since TokenizeOperator is not an // filtering operator. - boolean isPartitioned = false; - if (index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - isPartitioned = true; - } + boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX; // Create a new logical variable - token - List tokenizeKeyVars = new ArrayList(); - List> tokenizeKeyExprs = new ArrayList>(); + List tokenizeKeyVars = new ArrayList<>(); + List> tokenizeKeyExprs = new ArrayList<>(); LogicalVariable tokenVar = context.newVar(); tokenizeKeyVars.add(tokenVar); tokenizeKeyExprs .add(new MutableObject(new VariableReferenceExpression(tokenVar))); // Check the field type of the secondary key. - IAType secondaryKeyType = null; - Pair keyPairType = - Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType); + IAType secondaryKeyType; + Pair keyPairType = Index.getNonNullableOpenFieldType( + index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), + recType); secondaryKeyType = keyPairType.first; - List varTypes = new ArrayList(); + List varTypes = new ArrayList<>(); varTypes.add(NonTaggedFormatUtil.getTokenType(secondaryKeyType)); // If the index is a length-partitioned, then create @@ -428,64 +332,53 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit // TokenizeOperator to tokenize [SK, PK] pairs TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex, - insertOp.getPrimaryKeyExpressions(), secondaryExpressions, tokenizeKeyVars, - filterExpression, insertOp.getOperation(), insertOp.isBulkload(), isPartitioned, varTypes); - tokenUpdate.getInputs().add(new MutableObject(assign)); + primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions, + tokenizeKeyVars, + filterExpression, primaryIndexModificationOp.getOperation(), + primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes); + tokenUpdate.getInputs().add(new MutableObject(currentTop)); context.computeAndSetTypeEnvironmentForOperator(tokenUpdate); - - IndexInsertDeleteUpsertOperator indexUpdate = - new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(), - tokenizeKeyExprs, filterExpression, insertOp.getOperation(), insertOp.isBulkload(), - insertOp.getAdditionalNonFilteringExpressions() == null ? 0 - : insertOp.getAdditionalNonFilteringExpressions().size()); + replicateOutput = tokenUpdate; + indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex, + primaryIndexModificationOp.getPrimaryKeyExpressions(), tokenizeKeyExprs, filterExpression, + primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(), + primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0 + : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size()); indexUpdate.setAdditionalFilteringExpressions(filteringExpressions); indexUpdate.getInputs().add(new MutableObject(tokenUpdate)); - - context.computeAndSetTypeEnvironmentForOperator(indexUpdate); - - currentTop = indexUpdate; - op0.getInputs().add(new MutableObject(currentTop)); - } else { // When TokenizeOperator is not needed - IndexInsertDeleteUpsertOperator indexUpdate = - new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(), - secondaryExpressions, filterExpression, insertOp.getOperation(), - insertOp.isBulkload(), insertOp.getAdditionalNonFilteringExpressions() == null ? 0 - : insertOp.getAdditionalNonFilteringExpressions().size()); - + indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex, + primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions, + filterExpression, + primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(), + primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0 + : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size()); indexUpdate.setAdditionalFilteringExpressions(filteringExpressions); + replicateOutput = indexUpdate; // We add the necessary expressions for upsert - if (insertOp.getOperation() == Kind.UPSERT) { - indexUpdate.setPrevSecondaryKeyExprs(prevSecondaryExpressions); + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) { + indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions); if (filteringFields != null) { - indexUpdate.setPrevAdditionalFilteringExpression(new MutableObject( - new VariableReferenceExpression(insertOp.getPrevFilterVar()))); + indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject( + new VariableReferenceExpression( + primaryIndexModificationOp.getBeforeOpFilterVar()))); } } indexUpdate.getInputs().add(new MutableObject(currentTop)); - - currentTop = indexUpdate; - context.computeAndSetTypeEnvironmentForOperator(indexUpdate); - - if (insertOp.isBulkload()) { - op0.getInputs().add(new MutableObject(currentTop)); - } - } - - } else if (index.getIndexType() == IndexType.RTREE) { + } else { // Get type, dimensions and number of keys - Pair keyPairType = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType); + Pair keyPairType = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + secondaryKeyFields.get(0), recType); IAType spatialType = keyPairType.first; - boolean isPointMBR = - spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; + boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT + || spatialType.getTypeTag() == ATypeTag.POINT3D; int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); int numKeys = (isPointMBR && isBulkload) ? dimension : dimension * 2; // Get variables and expressions - List keyVarList = new ArrayList(); - List> keyExprList = new ArrayList>(); + List keyVarList = new ArrayList<>(); + List> keyExprList = new ArrayList<>(); for (int i = 0; i < numKeys; i++) { LogicalVariable keyVar = context.newVar(); keyVarList.add(keyVar); @@ -499,6 +392,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit new ConstantExpression(new AsterixConstantValue(new AInt32(i))))); keyExprList.add(new MutableObject(createMBR)); } + secondaryExpressions.clear(); for (LogicalVariable secondaryKeyVar : keyVarList) { secondaryExpressions.add( new MutableObject(new VariableReferenceExpression(secondaryKeyVar))); @@ -514,11 +408,12 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList); assignCoordinates.getInputs().add(new MutableObject(currentTop)); context.computeAndSetTypeEnvironmentForOperator(assignCoordinates); + replicateOutput = assignCoordinates; Mutable filterExpression = null; AssignOperator originalAssignCoordinates = null; - // We do something similar for previous key if the operation is an upsert - if (insertOp.getOperation() == Kind.UPSERT) { - List originalKeyVarList = new ArrayList(); + // We do something similar for beforeOp key if the operation is an upsert + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) { + List originalKeyVarList = new ArrayList<>(); List> originalKeyExprList = new ArrayList<>(); // we don't do any filtering since nulls are expected here and there for (int i = 0; i < numKeys; i++) { @@ -526,26 +421,18 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit originalKeyVarList.add(keyVar); AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression( FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR)); - createMBR.getArguments().add(new MutableObject( - new VariableReferenceExpression(prevSecondaryKeyVars.get(0)))); + createMBR.getArguments().add(beforeOpSecondaryExpressions.get(0)); createMBR.getArguments().add(new MutableObject( new ConstantExpression(new AsterixConstantValue(new AInt32(dimension))))); createMBR.getArguments().add(new MutableObject( new ConstantExpression(new AsterixConstantValue(new AInt32(i))))); originalKeyExprList.add(new MutableObject(createMBR)); } + beforeOpSecondaryExpressions.clear(); for (LogicalVariable secondaryKeyVar : originalKeyVarList) { - prevSecondaryExpressions.add(new MutableObject( + beforeOpSecondaryExpressions.add(new MutableObject( new VariableReferenceExpression(secondaryKeyVar))); } - if (isPointMBR && isBulkload) { - //for PointMBR optimization: see SecondaryRTreeOperationsHelper.buildLoadingJobSpec() and - //createFieldPermutationForBulkLoadOp(int) for more details. - for (LogicalVariable secondaryKeyVar : originalKeyVarList) { - prevSecondaryExpressions.add(new MutableObject( - new VariableReferenceExpression(secondaryKeyVar))); - } - } originalAssignCoordinates = new AssignOperator(originalKeyVarList, originalKeyExprList); originalAssignCoordinates.getInputs().add(new MutableObject(assignCoordinates)); context.computeAndSetTypeEnvironmentForOperator(originalAssignCoordinates); @@ -557,33 +444,35 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit context.getOutputTypeEnvironment(assignCoordinates), forceFilter); } AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp); - IndexInsertDeleteUpsertOperator indexUpdate = - new IndexInsertDeleteUpsertOperator(dataSourceIndex, insertOp.getPrimaryKeyExpressions(), - secondaryExpressions, filterExpression, insertOp.getOperation(), insertOp.isBulkload(), - insertOp.getAdditionalNonFilteringExpressions() == null ? 0 - : insertOp.getAdditionalNonFilteringExpressions().size()); + indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex, + primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression, + primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(), + primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0 + : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size()); indexUpdate.setAdditionalFilteringExpressions(filteringExpressions); - if (insertOp.getOperation() == Kind.UPSERT) { - // set old secondary key expressions + if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) { + // set before op secondary key expressions if (filteringFields != null) { - indexUpdate.setPrevAdditionalFilteringExpression(new MutableObject( - new VariableReferenceExpression(insertOp.getPrevFilterVar()))); + indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject( + new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar()))); } // set filtering expressions - indexUpdate.setPrevSecondaryKeyExprs(prevSecondaryExpressions); - // assign --> assign previous values --> secondary index upsert + indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions); + // assign --> assign beforeOp values --> secondary index upsert indexUpdate.getInputs().add(new MutableObject(originalAssignCoordinates)); } else { indexUpdate.getInputs().add(new MutableObject(assignCoordinates)); } + } + context.computeAndSetTypeEnvironmentForOperator(indexUpdate); + if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) { currentTop = indexUpdate; - context.computeAndSetTypeEnvironmentForOperator(indexUpdate); - - if (insertOp.isBulkload()) { - // For bulk load, we connect all fanned out insert operator to a single SINK operator - op0.getInputs().add(new MutableObject(currentTop)); - } - + } else { + replicateOp.getOutputs().add(new MutableObject<>(replicateOutput)); + } + if (primaryIndexModificationOp.isBulkload()) { + // For bulk load, we connect all fanned out insert operator to a single SINK operator + sinkOp.getInputs().add(new MutableObject(indexUpdate)); } } @@ -591,183 +480,181 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit return false; } - if (!insertOp.isBulkload()) { + if (!primaryIndexModificationOp.isBulkload()) { // If this is an upsert, we need to // Remove the current input to the SINK operator (It is actually already removed above) - op0.getInputs().clear(); + sinkOp.getInputs().clear(); // Connect the last index update to the SINK - op0.getInputs().add(new MutableObject(currentTop)); + sinkOp.getInputs().add(new MutableObject(currentTop)); } return true; } - // Merges typed index fields with specified recordType, allowing indexed fields to be optional. - // I.e. the type { "personId":int32, "name": string, "address" : { "street": string } } with typed indexes on age:int32, address.state:string - // will be merged into type { "personId":int32, "name": string, "age": int32? "address" : { "street": string, "state": string? } } - // Used by open indexes to enforce the type of an indexed record - public static ARecordType createEnforcedType(ARecordType initialType, List indexes) - throws AsterixException, AlgebricksException { - ARecordType enforcedType = initialType; + private LogicalVariable getRecordVar(IOptimizationContext context, AbstractLogicalOperator inputOp, + ILogicalExpression recordExpr, + int expectedRecordIndex) throws AlgebricksException { + if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), recordExpr)) { + return ((VariableReferenceExpression) recordExpr).getVariableReference(); + } else { + /** + * For the case primary key-assignment expressions are constant + * expressions, find assign op that creates record to be + * inserted/deleted. + */ + FunctionIdentifier fid = null; + AbstractLogicalOperator currentInputOp = inputOp; + while (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) { + if (currentInputOp.getInputs().isEmpty()) { + return null; + } + currentInputOp = (AbstractLogicalOperator) currentInputOp.getInputs().get(0).getValue(); + if (currentInputOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) { + continue; + } + AssignOperator assignOp = (AssignOperator) currentInputOp; + ILogicalExpression assignExpr = assignOp.getExpressions().get(expectedRecordIndex).getValue(); + if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) assignOp.getExpressions() + .get(expectedRecordIndex).getValue(); + fid = funcExpr.getFunctionIdentifier(); + } + } + return ((AssignOperator) currentInputOp).getVariables().get(0); + } + } + + private boolean exprIsRecord(IVariableTypeEnvironment typeEnvironment, ILogicalExpression recordExpr) + throws AlgebricksException { + if (recordExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) { + IAType type = (IAType) typeEnvironment.getType(recordExpr); + return type != null && type.getTypeTag() == ATypeTag.RECORD; + } + return false; + } + + private ILogicalOperator injectFieldAccessesForIndexes(IOptimizationContext context, Dataset dataset, + List indexes, Map fieldAccessVars, ARecordType recType, + ARecordType metaType, LogicalVariable recordVar, LogicalVariable metaVar, ILogicalOperator currentTop, + boolean afterOp) throws AlgebricksException { + List vars = new ArrayList<>(); + List> exprs = new ArrayList<>(); for (Index index : indexes) { - if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) { + if (index.isPrimaryIndex()) { continue; } - if (index.hasMetaFields()) { - throw new AlgebricksException("Indexing an open field is only supported on the record part"); - } + List skTypes = index.getKeyFieldTypes(); + List> skNames = index.getKeyFieldNames(); + List indicators = index.getKeyFieldSourceIndicators(); for (int i = 0; i < index.getKeyFieldNames().size(); i++) { - Stack> nestedTypeStack = new Stack>(); - List splits = index.getKeyFieldNames().get(i); - ARecordType nestedFieldType = enforcedType; - boolean openRecords = false; - String bridgeName = nestedFieldType.getTypeName(); - int j; - // Build the stack for the enforced type - for (j = 1; j < splits.size(); j++) { - nestedTypeStack.push(new Pair(nestedFieldType, splits.get(j - 1))); - bridgeName = nestedFieldType.getTypeName(); - nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j)); - if (nestedFieldType == null) { - openRecords = true; - break; - } + IndexFieldId indexFieldId = new IndexFieldId(indicators.get(i), skNames.get(i)); + if (fieldAccessVars.containsKey(indexFieldId)) { + // already handled in a different index + continue; } - if (openRecords == true) { - // create the smallest record - enforcedType = new ARecordType(splits.get(splits.size() - 2), - new String[] { splits.get(splits.size() - 1) }, - new IAType[] { AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)) }, true); - // create the open part of the nested field - for (int k = splits.size() - 3; k > (j - 2); k--) { - enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) }, - new IAType[] { AUnionType.createUnknownableType(enforcedType) }, true); - } - // Bridge the gap - Pair gapPair = nestedTypeStack.pop(); - ARecordType parent = gapPair.first; - - IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(), - new IAType[] { AUnionType.createUnknownableType(enforcedType) }); - enforcedType = new ARecordType(bridgeName, - ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes, - true); + ARecordType sourceType = dataset.hasMetaPart() + ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType; + LogicalVariable sourceVar = dataset.hasMetaPart() + ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar + : recordVar; + LogicalVariable fieldVar = context.newVar(); + // create record variable ref + Mutable varRef = + new MutableObject<>(new VariableReferenceExpression(sourceVar)); + IAType fieldType = sourceType.getSubFieldType(indexFieldId.fieldName); + AbstractFunctionCallExpression theFieldAccessFunc; + if (fieldType == null) { + // Open field. must prevent inlining to maintain the cast before the primaryOp and + // make handling of records with incorrect value type for this field easier and cleaner + context.addNotToBeInlinedVar(fieldVar); + // create field access + AbstractFunctionCallExpression fieldAccessFunc = + getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName); + // create cast + theFieldAccessFunc = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE)); + // The first argument is the field + theFieldAccessFunc.getArguments() + .add(new MutableObject(fieldAccessFunc)); + TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i), + BuiltinType.ANY); } else { - //Schema is closed all the way to the field - //enforced fields are either null or strongly typed - LinkedHashMap recordNameTypesMap = createRecordNameTypeMap(nestedFieldType); - // if a an enforced field already exists and the type is correct - IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1)); - if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION - && ((AUnionType) enforcedFieldType).isUnknownableType()) { - enforcedFieldType = ((AUnionType) enforcedFieldType).getActualType(); - } - if (enforcedFieldType != null && !ATypeHierarchy.canPromote(enforcedFieldType.getTypeTag(), - index.getKeyFieldTypes().get(i).getTypeTag())) { - throw new AlgebricksException("Cannot enforce field " + index.getKeyFieldNames().get(i) - + " to have type " + index.getKeyFieldTypes().get(i)); - } - if (enforcedFieldType == null) { - recordNameTypesMap.put(splits.get(splits.size() - 1), - AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i))); - } - enforcedType = new ARecordType(nestedFieldType.getTypeName(), - recordNameTypesMap.keySet().toArray(new String[recordNameTypesMap.size()]), - recordNameTypesMap.values().toArray(new IAType[recordNameTypesMap.size()]), - nestedFieldType.isOpen()); - } - - // Create the enforcedtype for the nested fields in the schema, from the ground up - if (nestedTypeStack.size() > 0) { - while (!nestedTypeStack.isEmpty()) { - Pair nestedTypePair = nestedTypeStack.pop(); - ARecordType nestedRecType = nestedTypePair.first; - IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone(); - nestedRecTypeFieldTypes[nestedRecType.getFieldIndex(nestedTypePair.second)] = enforcedType; - enforcedType = new ARecordType(nestedRecType.getTypeName() + "_enforced", - nestedRecType.getFieldNames(), nestedRecTypeFieldTypes, nestedRecType.isOpen()); - } + // Get the desired field position + int pos = indexFieldId.fieldName.size() > 1 ? -1 + : sourceType.getFieldIndex(indexFieldId.fieldName.get(0)); + // Field not found --> This is either an open field or a nested field. it can't be accessed by index + theFieldAccessFunc = + (pos == -1) ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName) + : getClosedFieldAccessFunction(varRef, pos); } + vars.add(fieldVar); + exprs.add(new MutableObject(theFieldAccessFunc)); + fieldAccessVars.put(indexFieldId, fieldVar); } } - return enforcedType; + // AssignOperator assigns secondary keys to their vars + AssignOperator castedFieldAssignOperator = new AssignOperator(vars, exprs); + return introduceNewOp(context, currentTop, castedFieldAssignOperator, afterOp); } - private static LinkedHashMap createRecordNameTypeMap(ARecordType nestedFieldType) { - LinkedHashMap recordNameTypesMap = new LinkedHashMap<>(); - for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) { - recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]); + private static ILogicalOperator introduceNewOp(IOptimizationContext context, ILogicalOperator currentTopOp, + ILogicalOperator newOp, boolean afterOp) throws AlgebricksException { + if (afterOp) { + newOp.getInputs().add(new MutableObject<>(currentTopOp)); + context.computeAndSetTypeEnvironmentForOperator(newOp); + return newOp; + } else { + newOp.getInputs().addAll(currentTopOp.getInputs()); + currentTopOp.getInputs().clear(); + currentTopOp.getInputs().add(new MutableObject<>(newOp)); + context.computeAndSetTypeEnvironmentForOperator(newOp); + context.computeAndSetTypeEnvironmentForOperator(currentTopOp); + return currentTopOp; } - return recordNameTypesMap; } - /*** - * This method takes a list of {fields}: a subset of {recordFields}, the original record variable - * and populate expressions with expressions which evaluate to those fields (using field access functions) and - * variables to represent them - * - * @param fields - * desired fields - * @param recordFields - * all the record fields - * @param recordVar - * the record variable - * @param expressions - * @param vars - * @param context - * @throws AlgebricksException - */ - @SuppressWarnings("unchecked") - private void prepareVarAndExpression(List fields, String[] recordFields, LogicalVariable recordVar, - List> expressions, List vars, IOptimizationContext context) - throws AlgebricksException { - // Get a reference to the record variable - Mutable varRef = new MutableObject<>(new VariableReferenceExpression(recordVar)); - // Get the desired field position - int pos = -1; - if (fields.size() == 1) { - for (int j = 0; j < recordFields.length; j++) { - if (recordFields[j].equals(fields.get(0))) { - pos = j; - break; - } - } - } - // Field not found --> This is either an open field or a nested field. it can't be accessed by index - AbstractFunctionCallExpression func; - if (pos == -1) { - if (fields.size() > 1) { - AOrderedList fieldList = new AOrderedList(new AOrderedListType(BuiltinType.ASTRING, null)); - for (int i = 0; i < fields.size(); i++) { - fieldList.add(new AString(fields.get(i))); - } - Mutable fieldRef = new MutableObject( - new ConstantExpression(new AsterixConstantValue(fieldList))); - // Create an expression for the nested case - func = new ScalarFunctionCallExpression( - FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED), varRef, fieldRef); - } else { - Mutable fieldRef = new MutableObject( - new ConstantExpression(new AsterixConstantValue(new AString(fields.get(0))))); - // Create an expression for the open field case (By name) - func = new ScalarFunctionCallExpression( - FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef); - } + private static AbstractFunctionCallExpression getClosedFieldAccessFunction(Mutable varRef, + int position) { + Mutable indexRef = new MutableObject<>( + new ConstantExpression(new AsterixConstantValue(new AInt32(position)))); + return new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef); + } + + private static AbstractFunctionCallExpression getOpenOrNestedFieldAccessFunction(Mutable varRef, + List fields) { + ScalarFunctionCallExpression func; + if (fields.size() > 1) { + IAObject fieldList = stringListToAOrderedList(fields); + Mutable fieldRef = constantToMutableLogicalExpression(fieldList); + // Create an expression for the nested case + func = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED), varRef, fieldRef); } else { - // Assumes the indexed field is in the closed portion of the type. - Mutable indexRef = new MutableObject( - new ConstantExpression(new AsterixConstantValue(new AInt32(pos)))); + IAObject fieldList = new AString(fields.get(0)); + Mutable fieldRef = constantToMutableLogicalExpression(fieldList); + // Create an expression for the open field case (By name) func = new ScalarFunctionCallExpression( - FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef); + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef); + } + return func; + } + + private static AOrderedList stringListToAOrderedList(List fields) { + AOrderedList fieldList = new AOrderedList(new AOrderedListType(BuiltinType.ASTRING, null)); + for (int i = 0; i < fields.size(); i++) { + fieldList.add(new AString(fields.get(i))); } - expressions.add(new MutableObject(func)); - LogicalVariable newVar = context.newVar(); - vars.add(newVar); + return fieldList; + } + + private static Mutable constantToMutableLogicalExpression(IAObject constantObject) { + return new MutableObject<>( + new ConstantExpression(new AsterixConstantValue(constantObject))); } - @SuppressWarnings("unchecked") private Mutable createFilterExpression(List secondaryKeyVars, IVariableTypeEnvironment typeEnv, boolean forceFilter) throws AlgebricksException { - List> filterExpressions = new ArrayList>(); + List> filterExpressions = new ArrayList<>(); // Add 'is not null' to all nullable secondary index keys as a filtering // condition. for (LogicalVariable secondaryKeyVar : secondaryKeyVars) { @@ -775,26 +662,50 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit if (!NonTaggedFormatUtil.isOptional(secondaryKeyType) && !forceFilter) { continue; } - ScalarFunctionCallExpression isUnknownFuncExpr = - new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN), - new MutableObject(new VariableReferenceExpression(secondaryKeyVar))); - ScalarFunctionCallExpression notFuncExpr = - new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), - new MutableObject(isUnknownFuncExpr)); + ScalarFunctionCallExpression isUnknownFuncExpr = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_UNKOWN), + new MutableObject(new VariableReferenceExpression(secondaryKeyVar))); + ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), + new MutableObject(isUnknownFuncExpr)); filterExpressions.add(new MutableObject(notFuncExpr)); } // No nullable secondary keys. if (filterExpressions.isEmpty()) { return null; } - Mutable filterExpression = null; + Mutable filterExpression; if (filterExpressions.size() > 1) { // Create a conjunctive condition. - filterExpression = new MutableObject(new ScalarFunctionCallExpression( + filterExpression = new MutableObject<>(new ScalarFunctionCallExpression( FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions)); } else { filterExpression = filterExpressions.get(0); } return filterExpression; } + + private class IndexFieldId { + private int indicator; + private List fieldName; + + public IndexFieldId(int indicator, List fieldName) { + this.indicator = indicator; + this.fieldName = fieldName; + } + + @Override + public int hashCode() { + return 31 * indicator + fieldName.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof IndexFieldId) { + IndexFieldId oIndexFieldId = (IndexFieldId) o; + return indicator == oIndexFieldId.indicator && fieldName.equals(oIndexFieldId.fieldName); + } + return false; + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/534d5892/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java index 8aefd1a..0c36d0b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java @@ -110,7 +110,7 @@ public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule { FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING)); // argument is the previous record isPrevMissingFunc.getArguments().add(new MutableObject( - new VariableReferenceExpression(insertDeleteUpsertOperator.getPrevRecordVar()))); + new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar()))); orFunc.getArguments().add(new MutableObject(isPrevMissingFunc)); orFunc.getArguments().add(new MutableObject(isNewMissingFunc)); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/534d5892/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java index eac35cd..ba79534 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/typecast/StaticTypeCastUtil.java @@ -506,9 +506,10 @@ public class StaticTypeCastUtil { * the expression reference * @param argExpr * the original expression + * @throws AlgebricksException */ private static void injectCastFunction(IFunctionInfo funcInfo, IAType reqType, IAType inputType, - Mutable exprRef, ILogicalExpression argExpr) { + Mutable exprRef, ILogicalExpression argExpr) throws AlgebricksException { ScalarFunctionCallExpression cast = new ScalarFunctionCallExpression(funcInfo); cast.getArguments().add(new MutableObject(argExpr)); exprRef.setValue(cast); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/534d5892/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b430807..90bf6f6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -25,10 +25,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.rmi.RemoteException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Date; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -152,9 +155,10 @@ import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.MetadataLockManager; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; import org.apache.asterix.runtime.util.AsterixAppContextInfo; import org.apache.asterix.runtime.util.AsterixClusterProperties; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; @@ -175,6 +179,7 @@ import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.util.ValidateUtil; import org.apache.asterix.util.FlushDatasetUtils; import org.apache.asterix.util.JobUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; @@ -976,7 +981,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ARecordType enforcedType = null; if (stmtCreateIndex.isEnforced()) { - enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, + enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index)); } @@ -2473,7 +2478,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen dataverseName); jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); ARecordType aRecordType = (ARecordType) dt.getDatatype(); - ARecordType enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType( + ARecordType enforcedType = createEnforcedType( aRecordType, indexes); if (ds.getDatasetType() == DatasetType.INTERNAL) { for (int j = 0; j < indexes.size(); j++) { @@ -3124,4 +3129,105 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen rewriter.rewrite(stmt); } + /* + * Merges typed index fields with specified recordType, allowing indexed fields to be optional. + * I.e. the type { "personId":int32, "name": string, "address" : { "street": string } } with typed indexes + * on age:int32, address.state:string will be merged into type { "personId":int32, "name": string, + * "age": int32? "address" : { "street": string, "state": string? } } Used by open indexes to enforce + * the type of an indexed record + */ + private static ARecordType createEnforcedType(ARecordType initialType, List indexes) + throws AlgebricksException { + ARecordType enforcedType = initialType; + for (Index index : indexes) { + if (!index.isSecondaryIndex() || !index.isEnforcingKeyFileds()) { + continue; + } + if (index.hasMetaFields()) { + throw new AlgebricksException("Indexing an open field is only supported on the record part"); + } + for (int i = 0; i < index.getKeyFieldNames().size(); i++) { + Deque> nestedTypeStack = new ArrayDeque<>(); + List splits = index.getKeyFieldNames().get(i); + ARecordType nestedFieldType = enforcedType; + boolean openRecords = false; + String bridgeName = nestedFieldType.getTypeName(); + int j; + // Build the stack for the enforced type + for (j = 1; j < splits.size(); j++) { + nestedTypeStack.push(new Pair(nestedFieldType, splits.get(j - 1))); + bridgeName = nestedFieldType.getTypeName(); + nestedFieldType = (ARecordType) enforcedType.getSubFieldType(splits.subList(0, j)); + if (nestedFieldType == null) { + openRecords = true; + break; + } + } + if (openRecords) { + // create the smallest record + enforcedType = new ARecordType(splits.get(splits.size() - 2), + new String[] { splits.get(splits.size() - 1) }, + new IAType[] { AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i)) }, true); + // create the open part of the nested field + for (int k = splits.size() - 3; k > (j - 2); k--) { + enforcedType = new ARecordType(splits.get(k), new String[] { splits.get(k + 1) }, + new IAType[] { AUnionType.createUnknownableType(enforcedType) }, true); + } + // Bridge the gap + Pair gapPair = nestedTypeStack.pop(); + ARecordType parent = gapPair.first; + + IAType[] parentFieldTypes = ArrayUtils.addAll(parent.getFieldTypes().clone(), + new IAType[] { AUnionType.createUnknownableType(enforcedType) }); + enforcedType = new ARecordType(bridgeName, + ArrayUtils.addAll(parent.getFieldNames(), enforcedType.getTypeName()), parentFieldTypes, + true); + } else { + //Schema is closed all the way to the field + //enforced fields are either null or strongly typed + LinkedHashMap recordNameTypesMap = createRecordNameTypeMap(nestedFieldType); + // if a an enforced field already exists and the type is correct + IAType enforcedFieldType = recordNameTypesMap.get(splits.get(splits.size() - 1)); + if (enforcedFieldType != null && enforcedFieldType.getTypeTag() == ATypeTag.UNION + && ((AUnionType) enforcedFieldType).isUnknownableType()) { + enforcedFieldType = ((AUnionType) enforcedFieldType).getActualType(); + } + if (enforcedFieldType != null && !ATypeHierarchy.canPromote(enforcedFieldType.getTypeTag(), + index.getKeyFieldTypes().get(i).getTypeTag())) { + throw new AlgebricksException("Cannot enforce field " + index.getKeyFieldNames().get(i) + + " to have type " + index.getKeyFieldTypes().get(i)); + } + if (enforcedFieldType == null) { + recordNameTypesMap.put(splits.get(splits.size() - 1), + AUnionType.createUnknownableType(index.getKeyFieldTypes().get(i))); + } + enforcedType = new ARecordType(nestedFieldType.getTypeName(), + recordNameTypesMap.keySet().toArray(new String[recordNameTypesMap.size()]), + recordNameTypesMap.values().toArray(new IAType[recordNameTypesMap.size()]), + nestedFieldType.isOpen()); + } + + // Create the enforced type for the nested fields in the schema, from the ground up + if (!nestedTypeStack.isEmpty()) { + while (!nestedTypeStack.isEmpty()) { + Pair nestedTypePair = nestedTypeStack.pop(); + ARecordType nestedRecType = nestedTypePair.first; + IAType[] nestedRecTypeFieldTypes = nestedRecType.getFieldTypes().clone(); + nestedRecTypeFieldTypes[nestedRecType.getFieldIndex(nestedTypePair.second)] = enforcedType; + enforcedType = new ARecordType(nestedRecType.getTypeName() + "_enforced", + nestedRecType.getFieldNames(), nestedRecTypeFieldTypes, nestedRecType.isOpen()); + } + } + } + } + return enforcedType; + } + + private static LinkedHashMap createRecordNameTypeMap(ARecordType nestedFieldType) { + LinkedHashMap recordNameTypesMap = new LinkedHashMap<>(); + for (int j = 0; j < nestedFieldType.getFieldNames().length; j++) { + recordNameTypesMap.put(nestedFieldType.getFieldNames()[j], nestedFieldType.getFieldTypes()[j]); + } + return recordNameTypesMap; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/534d5892/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan index c65c71c..4e6eef4 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/disjunction-to-join-delete-1.plan @@ -4,18 +4,17 @@ -- INDEX_INSERT_DELETE |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| - -- ASSIGN |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- INSERT_DELETE |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- MATERIALIZE |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- ASSIGN |PARTITIONED| - -- STREAM_PROJECT |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- BTREE_SEARCH |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$11(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$11] |PARTITIONED| - -- UNNEST |UNPARTITIONED| - -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- INSERT_DELETE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- MATERIALIZE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- BTREE_SEARCH |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$11(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$11] |PARTITIONED| + -- UNNEST |UNPARTITIONED| + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| \ No newline at end of file