asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject incubator-asterixdb-hyracks git commit: Changes to support Feeds 2.0 (random partitioning of tuples)
Date Mon, 29 Jun 2015 19:44:23 GMT
Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 0d87a57f7 -> 367756a44


Changes to support Feeds 2.0 (random partitioning of tuples)

commit 8b5c352d831aa0d7e006457f0b1430ac12b54731
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:16:11 2015 -0700

    Changes to support Feeds 2.0 (random partitioning of tuples)

Change-Id: I712c1f019cbc43f66d50620772c3df03c5944394
Reviewed-on: https://asterix-gerrit.ics.uci.edu/296
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/367756a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/367756a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/367756a4

Branch: refs/heads/master
Commit: 367756a44c82524a6da56f57924cdb6315c75490
Parents: 0d87a57
Author: ramangrover29 <ramangrover29@gmail.com>
Authored: Fri Jun 26 15:51:59 2015 -0700
Committer: Steven Jacobs <sjaco002@ucr.edu>
Committed: Mon Jun 29 12:30:54 2015 -0700

----------------------------------------------------------------------
 .../core/algebra/base/PhysicalOperatorTag.java  |  1 +
 .../operators/physical/AssignPOperator.java     | 15 +++-
 .../physical/RandomPartitionPOperator.java      | 72 ++++++++++++++++++++
 .../rules/EnforceStructuralPropertiesRule.java  | 14 +---
 .../RandomPartitionComputerFactory.java         | 35 ++++++++++
 5 files changed, 124 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/367756a4/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index d0be2a1..0c9e89a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -45,6 +45,7 @@ public enum PhysicalOperatorTag {
     PARTITIONINGSPLIT,
     PRE_CLUSTERED_GROUP_BY,
     PRE_SORTED_DISTINCT_BY,
+    RANDOM_PARTITION_EXCHANGE,
     RANDOM_MERGE_EXCHANGE,
     RANGE_PARTITION_EXCHANGE,
     RANGE_PARTITION_MERGE_EXCHANGE,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/367756a4/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 26313fb..0a67ced 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -18,6 +18,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -39,6 +40,7 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 public class AssignPOperator extends AbstractPhysicalOperator {
 
     private boolean flushFramesRapidly;
+    private int cardinalityConstraint = 0;
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -87,7 +89,13 @@ public class AssignPOperator extends AbstractPhysicalOperator {
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
opSchema, context);
-        builder.contributeMicroOperator(assign, runtime, recDesc);
+        if (cardinalityConstraint > 0) {
+            AlgebricksCountPartitionConstraint countConstraint = new AlgebricksCountPartitionConstraint(
+                    cardinalityConstraint);
+            builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint);
+        } else {
+            builder.contributeMicroOperator(assign, runtime, recDesc);
+        }
         // and contribute one edge from its child
         ILogicalOperator src = assign.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, assign, 0);
@@ -103,6 +111,11 @@ public class AssignPOperator extends AbstractPhysicalOperator {
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
+    public void setCardinalityConstraint(int cardinality) {
+        this.cardinalityConstraint = cardinality;
+    }
+
+
     @Override
     public boolean expensiveThanMaterialization() {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/367756a4/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
new file mode 100644
index 0000000..aa4852a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionPOperator extends AbstractExchangePOperator {
+
+    private final INodeDomain domain;
+
+    public RandomPartitionPOperator(INodeDomain domain) {
+        this.domain = domain;
+    }
+
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(),
op,
+                opSchema, context);
+        builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry
spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws
AlgebricksException {
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+        MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec,
tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
{
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
op2
+                .getDeliveredPhysicalProperties().getLocalProperties());
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/367756a4/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index ebbca8b..ebd7da1 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -56,6 +56,7 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemorySt
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
@@ -538,18 +539,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule
{
                 case RANDOM: {
                     RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
                     INodeDomain nd = rpp.getNodeDomain();
-                    if (nd == null) {
-                        throw new AlgebricksException("Unknown node domain for " + rpp);
-                    }
-                    if (nd.cardinality() == null) {
-                        throw new AlgebricksException("Unknown cardinality for node domain
" + nd);
-                    }
-                    if (nd.cardinality() != 1) {
-                        throw new NotImplementedException(
-                                "Random repartitioning is only implemented for target domains
of"
-                                        + "cardinality equal to 1.");
-                    }
-                    pop = new BroadcastPOperator(nd);
+                    pop = new RandomPartitionPOperator(nd);
                     break;
                 }
                 default: {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/367756a4/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
new file mode 100644
index 0000000..4f43584
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition;
+
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RandomPartitionComputerFactory implements
+		ITuplePartitionComputerFactory {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int domainCardinality;
+
+	public RandomPartitionComputerFactory(int domainCardinality) {
+		this.domainCardinality = domainCardinality;
+	}
+
+	@Override
+	public ITuplePartitionComputer createPartitioner() {
+		return new ITuplePartitionComputer() {
+
+			private final Random random = new Random();
+
+			@Override
+			public int partition(IFrameTupleAccessor accessor, int tIndex,
+					int nParts) throws HyracksDataException {
+				return random.nextInt(domainCardinality);
+			}
+		};
+	}
+
+}


Mime
View raw message