asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dl...@apache.org
Subject [1/7] asterixdb git commit: [ASTERIXDB-2078][SQL] DISTINCT modifier for aggregate functions
Date Tue, 19 Sep 2017 17:52:11 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master e66346a34 -> 7a4b5681f


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index fe11f64..54e577f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -18,29 +18,15 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -51,16 +37,15 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 
-public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
-
-    private List<LogicalVariable> columnList;
+public class PreSortedDistinctByPOperator extends AbstractPreSortedDistinctByPOperator {
 
     public PreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
-        this.columnList = columnList;
+        super(columnList);
     }
 
-    public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
-        this.columnList = distinctByColumns;
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
     }
 
     @Override
@@ -69,66 +54,22 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator
{
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
{
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
-        List<ILocalStructuralProperty> propsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties();
-        deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-        List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
-        List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
-        for (LogicalVariable column : columnList) {
-            orderColumns.add(new OrderColumn(column, OrderKind.ASC));
-        }
-        localProps.add(new LocalOrderProperty(orderColumns));
-        IPartitioningProperty pp = null;
-        AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
-        if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
-                    context.getComputationNodeDomain());
-        }
-        pv[0] = new StructuralPropertiesVector(pp, localProps);
-        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
-        int sz = inputSchemas[0].getSize();
-        int fdSz = sz - columnList.size();
-        int[] fdColumns = new int[fdSz];
-        int j = 0;
-        for (LogicalVariable v : inputSchemas[0]) {
-            if (!columnList.contains(v)) {
-                fdColumns[j++] = inputSchemas[0].findVariable(v);
-            }
-        }
-        int[] keysAndDecs = new int[keys.length + fdColumns.length];
-        for (int i = 0; i < keys.length; i++) {
-            keysAndDecs[i] = keys[i];
-        }
-        for (int i = 0; i < fdColumns.length; i++) {
-            keysAndDecs[i + keys.length] = fdColumns[i];
-        }
+        int[] keysAndDecs = getKeysAndDecs(inputSchemas[0]);
 
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
                 .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op),
context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
-        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories,
-                keysAndDecs);
+        IAggregatorDescriptorFactory aggregatorFactory =
+                new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, keysAndDecs);
 
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
opSchema,
-                context);
-        /** make fd columns part of the key but the comparator only compares the distinct
key columns */
+        RecordDescriptor recordDescriptor =
+                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
+        /* make fd columns part of the key but the comparator only compares the distinct
key columns */
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec,
keysAndDecs,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
 
@@ -137,14 +78,4 @@ public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator
{
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index a403211..2870074 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
@@ -59,7 +60,7 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
                 AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
                 afce.setFunctionInfo(aei.newFunInfo);
                 afce.getArguments().clear();
-                afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult));
+                afce.getArguments().add(new MutableObject<>(sai.stepOneResult));
             }
         }
     }
@@ -68,9 +69,6 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
             GroupByOperator newGbyOp, Set<SimilarAggregatesInfo> toReplaceSet, IOptimizationContext
context)
             throws AlgebricksException {
 
-        ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
-        ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
-
         List<LogicalVariable> initVars = initAgg.getVariables();
         List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
         int numExprs = initVars.size();
@@ -79,20 +77,22 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
         for (int i = 0; i < numExprs; i++) {
             AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue();
             if (!aggFun.isTwoStep()) {
-                return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null);
+                return new Pair<>(false, null);
             }
         }
 
+        ArrayList<LogicalVariable> pushedVars = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<>();
+
         boolean haveAggToReplace = false;
         for (int i = 0; i < numExprs; i++) {
             Mutable<ILogicalExpression> expRef = initExprs.get(i);
             AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
             IFunctionInfo fi1 = aggFun.getStepOneAggregate();
             // Clone the aggregate's args.
-            List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun
-                    .getArguments().size());
+            List<Mutable<ILogicalExpression>> newArgs = new ArrayList<>(aggFun.getArguments().size());
             for (Mutable<ILogicalExpression> er : aggFun.getArguments()) {
-                newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
+                newArgs.add(new MutableObject<>(er.getValue().cloneExpression()));
             }
             IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
 
@@ -100,10 +100,10 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
             LogicalVariable newAggVar = context.newVar();
             pushedVars.add(newAggVar);
             inf.stepOneResult = new VariableReferenceExpression(newAggVar);
-            inf.simAggs = new ArrayList<AggregateExprInfo>();
+            inf.simAggs = new ArrayList<>();
             toReplaceSet.add(inf);
             AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1,
false, newArgs);
-            pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
+            pushedExprs.add(new MutableObject<>(aggLocal));
             AggregateExprInfo aei = new AggregateExprInfo();
             aei.aggExprRef = expRef;
             aei.newFunInfo = fi2;
@@ -118,34 +118,43 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
             if (newGbyOp != null) {
                 // Cut and paste nested input pipelines of initAgg to pushedAgg's input
                 Mutable<ILogicalOperator> inputRef = initAgg.getInputs().get(0);
+                if (!isPushableInput(inputRef.getValue())) {
+                    return new Pair<>(false, null);
+                }
                 Mutable<ILogicalOperator> bottomRef = inputRef;
                 while (bottomRef.getValue().getInputs().size() > 0) {
                     bottomRef = bottomRef.getValue().getInputs().get(0);
+                    if (!isPushableInput(bottomRef.getValue())) {
+                        return new Pair<>(false, null);
+                    }
                 }
                 ILogicalOperator oldNts = bottomRef.getValue();
                 initAgg.getInputs().clear();
-                initAgg.getInputs().add(new MutableObject<ILogicalOperator>(oldNts));
+                initAgg.getInputs().add(new MutableObject<>(oldNts));
 
                 // Hook up the nested aggregate op with the outer group by.
-                NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(
-                        newGbyOp));
+                NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<>(newGbyOp));
                 nts.setExecutionMode(ExecutionMode.LOCAL);
                 bottomRef.setValue(nts);
                 pushedAgg.getInputs().add(inputRef);
             } else {
                 // The local aggregate operator is fed by the input of the original aggregate
operator.
-                pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
+                pushedAgg.getInputs().add(new MutableObject<>(initAgg.getInputs().get(0).getValue()));
                 // Reintroduce assign op for the global agg partitioning var.
                 initAgg.getInputs().get(0).setValue(pushedAgg);
                 pushedAgg.setGlobal(false);
                 context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
             }
-            return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
+            return new Pair<>(true, new MutableObject<ILogicalOperator>(pushedAgg));
         } else {
-            return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace,
null);
+            return new Pair<>(haveAggToReplace, null);
         }
     }
 
+    protected boolean isPushableInput(ILogicalOperator op) {
+        return op.getOperatorTag() != LogicalOperatorTag.DISTINCT;
+    }
+
     protected class SimilarAggregatesInfo {
         ILogicalExpression stepOneResult;
         List<AggregateExprInfo> simAggs;
@@ -157,6 +166,6 @@ public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewrite
     }
 
     protected class BookkeepingInfo {
-        Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<GroupByOperator,
List<LogicalVariable>>();
+        Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<>();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7a4b5681/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 16ed9cb..84961d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsert
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -135,7 +136,12 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                 }
                 case DISTINCT: {
                     DistinctOperator distinct = (DistinctOperator) op;
-                    distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
+                    if (topLevelOp) {
+                        distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
+                    } else {
+                        distinct.setPhysicalOperator(
+                                new MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
+                    }
                     break;
                 }
                 case EMPTYTUPLESOURCE: {


Mime
View raw message