asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianf...@apache.org
Subject [5/5] asterixdb git commit: Change logical plan to apply filter from 2ndary index
Date Wed, 07 Jun 2017 18:43:16 GMT
Change logical plan to apply filter from 2ndary index

- Changes the IntroduceLSMComponentFilterRule to
replace the constant filter value from the query to the value
carried from 2ndary index search.
- Can use 2ndary index filter even the query doens't contain
any filter related condition.

Change-Id: I0e2fe0208662e5dcd49d1a22bfb58f96533e9497
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1727
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/de0ece7f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/de0ece7f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/de0ece7f

Branch: refs/heads/master
Commit: de0ece7f151cd26cf3bf455fd0aa95df51f35d55
Parents: 8cbb05c
Author: Jianfeng Jia <jianfeng.jia@gmail.com>
Authored: Tue Jun 6 22:43:26 2017 -0700
Committer: Jianfeng Jia <jianfeng.jia@gmail.com>
Committed: Wed Jun 7 11:41:56 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 .../physical/BTreeSearchPOperator.java          |    3 +-
 .../physical/IndexSearchPOperator.java          |    2 +-
 .../physical/InvertedIndexPOperator.java        |   27 +-
 .../physical/RTreeSearchPOperator.java          |   10 +-
 .../am/IntroduceLSMComponentFilterRule.java     |  195 +-
 asterixdb/asterix-app/data/twitter/real.2.adm   | 5000 ++++++++++++++++++
 asterixdb/asterix-app/data/twitter/real.adm     | 5000 ++++++++++++++++++
 .../btree-btree-search-wo-query-filter.aql      |   38 +
 .../queries/filter/btree-btree-search.aql       |   42 +
 .../inverted-btree-search-wo-query-filter.aql   |   40 +
 .../queries/filter/inverted-btree-search.aql    |   43 +
 ...multi-index-btree-search-wo-query-filter.aql |   45 +
 .../queries/filter/multi-index-btree-search.aql |   48 +
 .../rtree-btree-search-wo-query-filter.aql      |   40 +
 .../queries/filter/rtree-btree-search.aql       |   43 +
 .../btree-btree-search-wo-query-filter.plan     |   15 +
 .../results/filter/btree-btree-search.plan      |   17 +
 .../inverted-btree-search-wo-query-filter.plan  |   13 +
 .../results/filter/inverted-btree-search.plan   |   15 +
 ...ulti-index-btree-search-wo-query-filter.plan |   33 +
 .../filter/multi-index-btree-search.plan        |   35 +
 .../rtree-btree-search-wo-query-filter.plan     |   15 +
 .../results/filter/rtree-btree-search.plan      |   17 +
 ...btree-rtree-ngram-intersect-with-filter.plan |   36 +-
 .../queries/filters/delete/delete.1.ddl.aql     |   96 +
 .../queries/filters/delete/delete.10.query.aql  |   27 +
 .../queries/filters/delete/delete.2.update.aql  |   25 +
 .../queries/filters/delete/delete.3.server.aql  |   26 +
 .../queries/filters/delete/delete.4.sleep.aql   |   22 +
 .../queries/filters/delete/delete.5.server.aql  |   22 +
 .../queries/filters/delete/delete.6.update.aql  |   22 +
 .../queries/filters/delete/delete.7.server.aql  |   23 +
 .../queries/filters/delete/delete.8.sleep.aql   |   20 +
 .../queries/filters/delete/delete.9.server.aql  |   20 +
 .../queries/filters/upsert/upsert.1.ddl.aql     |   96 +
 .../queries/filters/upsert/upsert.10.query.aql  |   26 +
 .../queries/filters/upsert/upsert.11.ddl.aql    |   20 +
 .../queries/filters/upsert/upsert.2.update.aql  |   27 +
 .../queries/filters/upsert/upsert.3.server.aql  |   26 +
 .../queries/filters/upsert/upsert.4.sleep.aql   |   22 +
 .../queries/filters/upsert/upsert.5.server.aql  |   22 +
 .../queries/filters/upsert/upsert.6.update.aql  |   55 +
 .../queries/filters/upsert/upsert.7.server.aql  |   23 +
 .../queries/filters/upsert/upsert.8.sleep.aql   |   20 +
 .../queries/filters/upsert/upsert.9.server.aql  |   20 +
 .../tinysocial-intersect.1.ddl.aql              |   46 +
 .../tinysocial-intersect.2.update.aql           |   23 +
 .../tinysocial-intersect.3.query.aql            |   28 +
 .../filters/delete/delete.1.ddl.sqlpp           |   96 +
 .../filters/delete/delete.10.query.sqlpp        |   23 +
 .../filters/delete/delete.11.ddl.sqlpp          |   27 +
 .../filters/delete/delete.2.update.sqlpp        |   24 +
 .../filters/delete/delete.3.server.sqlpp        |   26 +
 .../filters/delete/delete.4.sleep.sqlpp         |   20 +
 .../filters/delete/delete.5.server.sqlpp        |   20 +
 .../filters/delete/delete.6.update.sqlpp        |   22 +
 .../filters/delete/delete.7.server.sqlpp        |   23 +
 .../filters/delete/delete.8.sleep.sqlpp         |   20 +
 .../filters/delete/delete.9.server.sqlpp        |   20 +
 .../filters/upsert/upsert.1.ddl.sqlpp           |   97 +
 .../filters/upsert/upsert.10.query.sqlpp        |   23 +
 .../filters/upsert/upsert.11.ddl.sqlpp          |   27 +
 .../filters/upsert/upsert.2.update.sqlpp        |   26 +
 .../filters/upsert/upsert.3.server.sqlpp        |   26 +
 .../filters/upsert/upsert.4.sleep.sqlpp         |   20 +
 .../filters/upsert/upsert.5.server.sqlpp        |   20 +
 .../filters/upsert/upsert.6.update.sqlpp        |   53 +
 .../filters/upsert/upsert.7.server.sqlpp        |   23 +
 .../filters/upsert/upsert.8.sleep.sqlpp         |   20 +
 .../filters/upsert/upsert.9.server.sqlpp        |   20 +
 .../results/filters/delete/delete.1.adm         |    0
 .../results/filters/upsert/upsert.1.adm         |    1 +
 .../intersection-with-filter/intersection.1.adm |    1 +
 .../src/test/resources/runtimets/testsuite.xml  |   15 +
 .../resources/runtimets/testsuite_sqlpp.xml     |   10 +
 .../metadata/declared/DatasetDataSource.java    |    2 +-
 .../metadata/declared/MetadataProvider.java     |   11 +-
 .../operators/logical/AbstractScanOperator.java |    5 +
 .../logical/AbstractUnnestMapOperator.java      |   35 +
 .../operators/logical/IntersectOperator.java    |   73 +-
 .../operators/logical/UnnestMapOperator.java    |    2 +-
 .../operators/physical/IntersectPOperator.java  |   35 +-
 .../intersect/IntersectOperatorDescriptor.java  |   91 +-
 .../unit/IntersectOperatorDescriptorTest.java   |    4 +-
 85 files changed, 12367 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 295d874..569eb3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ dist
 *.swp
 .m2*
 ß
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 9dd57d5..2fd9079 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -115,6 +115,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator {
 
         int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
         int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+        boolean propagateFilter = unnestMap.propagateIndexFilter();
 
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
         Dataset dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
@@ -124,7 +125,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator {
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
                 builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(),
retainMissing,
                 dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
-                jobGenParams.isHighKeyInclusive(), minFilterFieldIndexes, maxFilterFieldIndexes);
+                jobGenParams.isHighKeyInclusive(), propagateFilter, minFilterFieldIndexes,
maxFilterFieldIndexes);
 
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
index ce43480..9f46e6a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -63,7 +63,7 @@ public abstract class IndexSearchPOperator extends AbstractScanPOperator
{
         IDataSource<?> ds = idx.getDataSource();
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;
-        deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+        deliveredProperties = dspp.computePropertiesVector(as.getScanVariables());
     }
 
     protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[]
inputSchemas) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 50c762e..213c60b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -135,13 +135,16 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
             AbstractUnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput,
boolean retainMissing,
             String datasetName, Dataset dataset, String indexName, ATypeTag searchKeyType,
int[] keyFields,
             SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold,
-            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery)
-            throws AlgebricksException {
+            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
+            boolean isFullTextSearchQuery) throws AlgebricksException {
         try {
+
+            boolean propagateIndexFilter = unnestMap.propagateIndexFilter();
             IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
             int numPrimaryKeys = dataset.getPrimaryKeys().size();
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE
+                    .getIndex(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
                 throw new AlgebricksException(
                         "Code generation error: no index " + indexName + " for dataset "
+ datasetName);
@@ -160,13 +163,15 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
             IIndexDataflowHelperFactory dataflowHelperFactory =
                     new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                             secondarySplitsAndConstraint.first);
-            LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
-                    jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory,
-                    searchModifierFactory, retainInput, retainMissing, context.getMissingWriterFactory(),
-                    dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
secondaryIndex,
-                            ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
-                            IndexOperation.SEARCH, null),
-                    minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery,
numPrimaryKeys, false);
+            LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+                    new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc,
queryField,
+                            dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory,
retainInput,
+                            retainMissing, context.getMissingWriterFactory(),
+                            dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
+                                    secondaryIndex,
+                                    ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
+                                    IndexOperation.SEARCH, null), minFilterFieldIndexes,
maxFilterFieldIndexes,
+                            isFullTextSearchQuery, numPrimaryKeys, propagateIndexFilter);
             return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index f9d4c80..733e62f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -66,7 +66,7 @@ public class RTreeSearchPOperator extends IndexSearchPOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         AbstractUnnestMapOperator unnestMap = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -81,6 +81,7 @@ public class RTreeSearchPOperator extends IndexSearchPOperator {
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
 
+        boolean propagateIndexFilter = unnestMap.propagateIndexFilter();
         int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
         int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
 
@@ -97,9 +98,10 @@ public class RTreeSearchPOperator extends IndexSearchPOperator {
             // By nature, LEFT_OUTER_UNNEST_MAP should generate null values for non-matching
tuples.
             retainNull = true;
         }
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = mp.buildRtreeRuntime(
-                builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
retainNull,
-                dataset, jobGenParams.getIndexName(), keyIndexes, minFilterFieldIndexes,
maxFilterFieldIndexes);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
+                mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv,
context,
+                        jobGenParams.getRetainInput(), retainNull, dataset, jobGenParams.getIndexName(),
keyIndexes,
+                        propagateIndexFilter, minFilterFieldIndexes, maxFilterFieldIndexes);
 
         builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index 83b277d..95f0de9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -19,11 +19,16 @@
 package org.apache.asterix.optimizer.rules.am;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -56,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -63,6 +69,8 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
 
+    static final Logger LOGGER = Logger.getLogger(IntroduceLSMComponentFilterRule.class.getName());
+
     protected IVariableTypeEnvironment typeEnvironment = null;
 
     @Override
@@ -80,12 +88,6 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule
{
         }
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        typeEnvironment = context.getOutputTypeEnvironment(op);
-        ILogicalExpression condExpr = ((SelectOperator) op).getCondition().getValue();
-        AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, context, typeEnvironment);
-        if (analysisCtx.getMatchedFuncExprs().isEmpty()) {
-            return false;
-        }
 
         Dataset dataset = getDataset(op, context);
         List<String> filterFieldName = null;
@@ -101,22 +103,33 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule
{
         if (filterFieldName == null || recType == null) {
             return false;
         }
-        List<Index> datasetIndexes = ((MetadataProvider) context.getMetadataProvider())
-                .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+
+        IAType filterType = recType.getSubFieldType(filterFieldName);
+
+        typeEnvironment = context.getOutputTypeEnvironment(op);
+        ILogicalExpression condExpr = ((SelectOperator) op).getCondition().getValue();
+        AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, context, typeEnvironment);
 
         List<IOptimizableFuncExpr> optFuncExprs = new ArrayList<>();
 
-        for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) {
-            IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(i);
-            boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, recType, datasetIndexes,
context);
-            if (found && optFuncExpr.getFieldName(0).equals(filterFieldName)) {
-                optFuncExprs.add(optFuncExpr);
+        if (!analysisCtx.getMatchedFuncExprs().isEmpty()) {
+            List<Index> datasetIndexes = ((MetadataProvider) context.getMetadataProvider())
+                    .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+
+            for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) {
+                IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(i);
+                boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, recType,
datasetIndexes, context);
+                if (found && optFuncExpr.getFieldName(0).equals(filterFieldName))
{
+                    optFuncExprs.add(optFuncExpr);
+                }
             }
         }
+
         if (optFuncExprs.isEmpty()) {
-            return false;
+            assignFilterFromSecondaryUnnestMap(op, dataset, context, filterType);
+        } else {
+            assignFilterFromQuery(optFuncExprs, op, dataset, context, filterType);
         }
-        changePlan(optFuncExprs, op, dataset, context);
 
         OperatorPropertiesUtil.typeOpRec(opRef, context);
         context.addToDontApplySet(this, op);
@@ -147,9 +160,11 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule
{
         return new AssignOperator(assignKeyVarList, assignKeyExprList);
     }
 
-    private void changePlan(List<IOptimizableFuncExpr> optFuncExprs, AbstractLogicalOperator
op, Dataset dataset,
-            IOptimizationContext context) throws AlgebricksException {
+    private void assignFilterFromQuery(List<IOptimizableFuncExpr> optFuncExprs, AbstractLogicalOperator
op,
+            Dataset dataset, IOptimizationContext context, IAType filterType) throws AlgebricksException
{
 
+        List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>();
+        boolean hasSecondaryIndexMap = false;
         Queue<Mutable<ILogicalOperator>> queue = new LinkedList<>(op.getInputs());
         while (!queue.isEmpty()) {
             AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) queue.poll().getValue();
@@ -176,8 +191,7 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule
{
 
                     dataSourceScanOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
 
-                    assignOp.getInputs()
-                            .add(new MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue()));
+                    assignOp.getInputs().add(new MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue()));
                     dataSourceScanOp.getInputs().get(0).setValue(assignOp);
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
@@ -207,14 +221,153 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule
{
                                     .add(new MutableObject<ILogicalExpression>(new
VariableReferenceExpression(var)));
                         }
                         unnestMapOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                        assignOp.getInputs()
-                                .add(new MutableObject<>(unnestMapOp.getInputs().get(0).getValue()));
+                        assignOp.getInputs().add(new MutableObject<>(unnestMapOp.getInputs().get(0).getValue()));
                         unnestMapOp.getInputs().get(0).setValue(assignOp);
+
+                        if (jobGenParams.isPrimaryIndex) {
+                            primaryUnnestMapOps.add(unnestMapOp);
+                        } else {
+                            hasSecondaryIndexMap = true;
+                        }
                     }
                 }
             }
             queue.addAll(descendantOp.getInputs());
         }
+        if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) {
+            propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, context);
+        }
+    }
+
+    private void propagateFilterToPrimaryIndex(List<UnnestMapOperator> primaryUnnestMapOps,
IAType filterType,
+            IOptimizationContext context) throws AlgebricksException {
+        for (UnnestMapOperator primaryOp : primaryUnnestMapOps) {
+            Mutable<ILogicalOperator> assignOrOrderOrIntersect = primaryOp.getInputs().get(0);
+            Mutable<ILogicalOperator> intersectOrSort = assignOrOrderOrIntersect;
+
+            if (assignOrOrderOrIntersect.getValue().getOperatorTag() == LogicalOperatorTag.ASSIGN)
{
+                intersectOrSort = assignOrOrderOrIntersect.getValue().getInputs().get(0);
+            }
+
+            switch (intersectOrSort.getValue().getOperatorTag()) {
+                case INTERSECT:
+                    IntersectOperator intersect = (IntersectOperator) (intersectOrSort.getValue());
+                    List<List<LogicalVariable>> filterVars = new ArrayList<>(intersect.getInputs().size());
+                    for (Mutable<ILogicalOperator> mutableOp : intersect.getInputs())
{
+                        ILogicalOperator child = mutableOp.getValue();
+                        while (!child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP))
{
+                            child = child.getInputs().get(0).getValue();
+                        }
+                        UnnestMapOperator unnestMap = (UnnestMapOperator) child;
+                        propagateFilterInSecondaryUnnsetMap(unnestMap, filterType, context);
+
+                        List<LogicalVariable> extraVars = Arrays.asList(unnestMap.getPropagateIndexMinFilterVar(),
+                                unnestMap.getPropagateIndexMaxFilterVar());
+                        filterVars.add(extraVars);
+                    }
+                    if (!filterVars.isEmpty()) {
+                        List<LogicalVariable> outputFilterVars = new ArrayList<>(filterVars.get(0));
+                        IntersectOperator intersectWithFilter =
+                                createIntersectWithFilter(outputFilterVars, filterVars, intersect);
+
+                        intersectOrSort.setValue(intersectWithFilter);
+                        context.computeAndSetTypeEnvironmentForOperator(intersectWithFilter);
+                        setPrimaryFilterVar(primaryOp, outputFilterVars.get(0), outputFilterVars.get(1),
context);
+                    }
+                    break;
+                case ORDER:
+                    ILogicalOperator child = intersectOrSort.getValue().getInputs().get(0).getValue();
+                    if (child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP)) {
+                        UnnestMapOperator secondaryMap = (UnnestMapOperator) child;
+
+                        propagateFilterInSecondaryUnnsetMap(secondaryMap, filterType, context);
+
+                        setPrimaryFilterVar(primaryOp, secondaryMap.getPropagateIndexMinFilterVar(),
+                                secondaryMap.getPropagateIndexMaxFilterVar(), context);
+                    }
+                    break;
+                default:
+                    throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                            intersectOrSort.getValue().getOperatorTag().toString());
+            }
+        }
+    }
+
+    private IntersectOperator createIntersectWithFilter(List<LogicalVariable> outputFilterVars,
+            List<List<LogicalVariable>> filterVars, IntersectOperator intersect)
throws AlgebricksException {
+        List<LogicalVariable> outputVars = new ArrayList<>();
+        outputVars.addAll(intersect.getOutputVars());
+        outputVars.addAll(outputFilterVars);
+
+        List<List<LogicalVariable>> compareVars = new ArrayList<>(intersect.getNumInput());
+        for (int i = 0; i < intersect.getNumInput(); i++) {
+            compareVars.add(new ArrayList<>(intersect.getCompareVariables(i)));
+        }
+
+        IntersectOperator intersectWithFilter = new IntersectOperator(outputVars, compareVars,
filterVars);
+        intersectWithFilter.getInputs().addAll(intersect.getInputs());
+        return intersectWithFilter;
+    }
+
+    private void propagateFilterInSecondaryUnnsetMap(UnnestMapOperator secondaryUnnest, IAType
filterType,
+            IOptimizationContext context) throws AlgebricksException {
+
+        LogicalVariable minIndexFilterVar = context.newVar();
+        LogicalVariable maxIndexFilterVar = context.newVar();
+        secondaryUnnest.markPropagageIndexFilter();
+        secondaryUnnest.getVariables().add(minIndexFilterVar);
+        secondaryUnnest.getVariableTypes().add(filterType);
+        secondaryUnnest.getVariables().add(maxIndexFilterVar);
+        secondaryUnnest.getVariableTypes().add(filterType);
+
+        context.computeAndSetTypeEnvironmentForOperator(secondaryUnnest);
+    }
+
+    private void setPrimaryFilterVar(UnnestMapOperator primaryOp, LogicalVariable minFilterVar,
+            LogicalVariable maxFilterVar, IOptimizationContext context) throws AlgebricksException
{
+        primaryOp.setMinFilterVars(Collections.singletonList(minFilterVar));
+        primaryOp.setMaxFilterVars(Collections.singletonList(maxFilterVar));
+
+        List<Mutable<ILogicalExpression>> indexFilterExpression =
+                Arrays.asList(new MutableObject<>(new VariableReferenceExpression(minFilterVar)),
+                        new MutableObject<>(new VariableReferenceExpression(maxFilterVar)));
+
+        primaryOp.setAdditionalFilteringExpressions(indexFilterExpression);
+        context.computeAndSetTypeEnvironmentForOperator(primaryOp);
+    }
+
+    private void assignFilterFromSecondaryUnnestMap(AbstractLogicalOperator op, Dataset dataset,
+            IOptimizationContext context, IAType filterType) throws AlgebricksException {
+        List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>();
+        boolean hasSecondaryIndexMap = false;
+        Queue<Mutable<ILogicalOperator>> queue = new LinkedList<>(op.getInputs());
+        while (!queue.isEmpty()) {
+            ILogicalOperator descendantOp = queue.poll().getValue();
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL)
{
+                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                    FunctionIdentifier fid = f.getFunctionIdentifier();
+                    if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) {
+                        throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
fid.getName());
+                    }
+                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    if (dataset.getDatasetName().compareTo(jobGenParams.datasetName) == 0)
{
+                        if (jobGenParams.isPrimaryIndex) {
+                            primaryUnnestMapOps.add(unnestMapOp);
+                        } else {
+                            hasSecondaryIndexMap = true;
+                        }
+                    }
+                }
+            }
+            queue.addAll(descendantOp.getInputs());
+        }
+        if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) {
+            propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, context);
+        }
     }
 
     private Dataset getDataset(AbstractLogicalOperator op, IOptimizationContext context)
throws AlgebricksException {


Mime
View raw message