asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Jacobs (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in hyracks[master]: ASTERIXDB-1018, ASTERIXDB-1017, ASTERIXDB-1019, ASTERIXDB-10...
Date Thu, 12 Nov 2015 19:42:28 GMT
Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/488

Change subject: ASTERIXDB-1018, ASTERIXDB-1017, ASTERIXDB-1019, ASTERIXDB-1029, ASTERIXDB-1030,
ASTERIXDB-1034
......................................................................

ASTERIXDB-1018, ASTERIXDB-1017, ASTERIXDB-1019, ASTERIXDB-1029, ASTERIXDB-1030, ASTERIXDB-1034

Change-Id: I18cfa3875d676f71b26e91433ff101a7e725c890
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
5 files changed, 78 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/88/488/1

diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 74200b4..046d497 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -70,7 +70,8 @@
     }
 
     protected PhysicalRequirements emptyUnaryRequirements() {
-        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] { StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR
};
+        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] {
+                StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
         return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
@@ -103,7 +104,8 @@
         return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
     }
 
-    protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op,
IOperatorDescriptor opDesc) {
+    protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op,
+            IOperatorDescriptor opDesc) {
         if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
             AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
             builder.contributeAlgebricksPartitionConstraint(opDesc, apc);
@@ -113,22 +115,27 @@
 
     protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext
context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
         PlanCompiler pc = new PlanCompiler(context);
         int i = 0;
         for (ILogicalPlan p : npOp.getNestedPlans()) {
-            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema,
pc);
+            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, opSchema, pc);
         }
         return subplans;
     }
 
     private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema
outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler
pc) throws AlgebricksException {
+            IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots are not supported.");
         }
         JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+        return getPipeline(p, opSchema, pc, nestedJob);
+    }
+
+    protected AlgebricksPipeline getPipeline(ILogicalPlan p, IOperatorSchema opSchema, PlanCompiler
pc,
+            JobSpecification nestedJob) throws AlgebricksException {
         ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
         JobGenContext context = pc.getContext();
         IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
@@ -136,10 +143,8 @@
 
         Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap();
         if (opMap.size() != 1) {
-            throw new AlgebricksException(
-                    "Attempting to construct a nested plan with "
-                            + opMap.size()
-                            + " operator descriptors. Currently, nested plans can only consist
in linear pipelines of Asterix micro operators.");
+            throw new AlgebricksException("Attempting to construct a nested plan with " +
opMap.size()
+                    + " operator descriptors. Currently, nested plans can only consist in
linear pipelines of Asterix micro operators.");
         }
 
         for (OperatorDescriptorId oid : opMap.keySet()) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index ecb7186..ac4adea 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
@@ -31,6 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -39,10 +39,12 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.operators.meta.SubplanRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
 
 public class SubplanPOperator extends AbstractPhysicalOperator {
 
@@ -85,16 +87,16 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         SubplanOperator subplan = (SubplanOperator) op;
         if (subplan.getNestedPlans().size() != 1) {
             throw new NotImplementedException("Subplan currently works only for one nested
plan with one root.");
         }
-        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema,
context);
+        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema,
context, builder);
         assert (subplans.length == 1);
         AlgebricksPipeline np = subplans[0];
-        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()),
-                inputSchemas[0], context);
+        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0],
context);
         INullWriterFactory[] nullWriterFactories = new INullWriterFactory[np.getOutputWidth()];
         for (int i = 0; i < nullWriterFactories.length; i++) {
             nullWriterFactories[i] = context.getNullWriterFactory();
@@ -108,6 +110,29 @@
         builder.contributeGraphEdge(src, 0, op, 0);
     }
 
+    protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext
context,
+            IHyracksJobBuilder builder) throws AlgebricksException {
+        AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
+        PlanCompiler pc = new PlanCompiler(context);
+        int i = 0;
+        for (ILogicalPlan p : npOp.getNestedPlans()) {
+            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema,
pc, builder);
+        }
+        return subplans;
+    }
+
+    private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema
outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler
pc, IHyracksJobBuilder builder)
+                    throws AlgebricksException {
+        if (p.getRoots().size() > 1) {
+            throw new NotImplementedException("Nested plans with several roots are not supported.");
+        }
+        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema,
+                builder.getJobSpec().getJobletEventListenerFactory());
+        return getPipeline(p, opSchema, pc, nestedJob);
+    }
+
     @Override
     public boolean expensiveThanMaterialization() {
         return true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
index 67e8d42..cd70769 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
@@ -188,9 +188,18 @@
         Set<LogicalVariable> pkVars = computeGbyVars(outerNts, free, context);
         if (pkVars == null || pkVars.size() < 1) {
             // there is no non-trivial primary key, group-by keys are all live variables
+            // that were produced by descendant or self
             ILogicalOperator subplanInput = subplan.getInputs().get(0).getValue();
             pkVars = new HashSet<LogicalVariable>();
+            //get live variables
             VariableUtilities.getLiveVariables(subplanInput, pkVars);
+
+            //get produced variables
+            Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+            VariableUtilities.getProducedVariablesInDescendantsAndSelf(subplanInput, producedVars);
+
+            //retain the intersection
+            pkVars.retainAll(producedVars);
         }
         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Found FD for introducing group-by: " + pkVars);
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
index d6f1889..c8199dd 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
@@ -64,6 +64,7 @@
 
         List<ILogicalOperator> pushedOnLeft = new ArrayList<ILogicalOperator>();
         List<ILogicalOperator> pushedOnRight = new ArrayList<ILogicalOperator>();
+        List<ILogicalOperator> pushedOnEither = new ArrayList<ILogicalOperator>();
         LinkedList<ILogicalOperator> notPushedStack = new LinkedList<ILogicalOperator>();
         Collection<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
         Collection<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
@@ -107,7 +108,9 @@
                 } else {
                     VariableUtilities.getUsedVariables(opIter, usedVars);
                     VariableUtilities.getProducedVariables(opIter, producedVars);
-                    if (joinLiveVarsLeft.containsAll(usedVars)) {
+                    if (usedVars.size() == 0) {
+                        pushedOnEither.add(opIter);
+                    } else if (joinLiveVarsLeft.containsAll(usedVars)) {
                         pushedOnLeft.add(opIter);
                         liveInOpsToPushLeft.addAll(producedVars);
                     } else if (joinLiveVarsRight.containsAll(usedVars)) {
@@ -149,6 +152,12 @@
             return false;
         }
         if (needToPushOps) {
+            //We should push independent ops into the first branch that the selection depends
on
+            if (intersectsBranch[0]) {
+                pushOps(pushedOnEither, joinBranchLeftRef, context);
+            } else {
+                pushOps(pushedOnEither, joinBranchRightRef, context);
+            }
             pushOps(pushedOnLeft, joinBranchLeftRef, context);
             pushOps(pushedOnRight, joinBranchRightRef, context);
         }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 134e6fb..6a6c1cd 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -25,7 +25,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -96,12 +95,27 @@
                     currentOpRef = currentOpRef.getValue().getInputs().get(0);
                 }
             }
+        } else {
+            //Move the boundary below any top const assigns
+            boundaryOpRef = opRef.getValue().getInputs().get(0);
+            while (boundaryOpRef.getValue().getOperatorTag() == LogicalOperatorTag.ASSIGN)
{
+                List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getUsedVariables(boundaryOpRef.getValue(), opUsedVars);
+                if (opUsedVars.size() == 0) {
+                    // move down the boundary if the operator is a const assigns
+                    boundaryOpRef = boundaryOpRef.getValue().getInputs().get(0);
+                } else {
+                    break;
+                }
+            }
+
         }
 
         /** join the two independent branches */
         InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE),
                 new MutableObject<ILogicalOperator>(boundaryOpRef.getValue()), new
MutableObject<ILogicalOperator>(
                         opRef.getValue()));
+
         opRef.setValue(join);
         ILogicalOperator ets = new EmptyTupleSourceOperator();
         boundaryOpRef.setValue(ets);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/488
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I18cfa3875d676f71b26e91433ff101a7e725c890
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco002@ucr.edu>

Mime
View raw message