flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.
Date Fri, 20 Jan 2017 16:38:54 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 f6f1c244c -> 6566b63aa


[FLINK-2662] [optimizer] Fix translation of broadcasted unions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6566b63a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6566b63a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6566b63a

Branch: refs/heads/release-1.1
Commit: 6566b63aa50af946ecf0d028c4e284fc6dc2a55b
Parents: f6f1c24
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Jan 6 00:00:30 2017 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Jan 20 17:35:44 2017 +0100

----------------------------------------------------------------------
 .../flink/optimizer/dag/BinaryUnionNode.java    | 35 ++++++++++
 .../operators/BinaryUnionOpDescriptor.java      |  4 ++
 .../flink/optimizer/UnionReplacementTest.java   | 72 ++++++++++++++++++--
 3 files changed, 106 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index d262cf6..cb496a2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode {
 			throw new CompilerException("BinaryUnionNode has more than one successor.");
 		}
 
+		boolean childrenSkippedDueToReplicatedInput = false;
+
 		// check if we have a cached version
 		if (this.cachedPlans != null) {
 			return this.cachedPlans;
@@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode {
 		
 		// create all candidates
 		for (PlanNode child1 : subPlans1) {
+
+			if (child1.getGlobalProperties().isFullyReplicated()) {
+				// fully replicated input is always locally forwarded if parallelism is not changed
+				if (dopChange1) {
+					// can not continue with this child
+					childrenSkippedDueToReplicatedInput = true;
+					continue;
+				} else {
+					this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+				}
+			}
+
 			for (PlanNode child2 : subPlans2) {
+
+				if (child2.getGlobalProperties().isFullyReplicated()) {
+					// fully replicated input is always locally forwarded if parallelism is not changed
+					if (dopChange2) {
+						// can not continue with this child
+						childrenSkippedDueToReplicatedInput = true;
+						continue;
+					} else {
+						this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+					}
+				}
 				
 				// check that the children go together. that is the case if they build upon the same
 				// candidate at the joined branch plan. 
@@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode {
 			}
 		}
 
+		if(outputPlans.isEmpty()) {
+			if(childrenSkippedDueToReplicatedInput) {
+				throw new CompilerException("No plan meeting the requirements could be created @ " +
this
+					+ ". Most likely reason: Invalid use of replicated input.");
+			} else {
+				throw new CompilerException("No plan meeting the requirements could be created @ " +
this
+					+ ". Most likely reason: Too restrictive plan hints.");
+			}
+		}
+
 		// cost and prune the plans
 		for (PlanNode node : outputPlans) {
 			estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
index 78ac3d6..acfb229 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
@@ -97,6 +97,10 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
 					in2.getPartitioning() == PartitioningProperty.FORCED_REBALANCED) {
 			newProps.setForcedRebalanced();
 		}
+		else if (in1.getPartitioning() == PartitioningProperty.FULL_REPLICATION &&
+			in2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) {
+			newProps.setFullyReplicated();
+		}
 
 		return newProps;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index d0bb376..be6804b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.optimizer;
 
-import junit.framework.Assert;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -36,7 +36,6 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -363,11 +362,11 @@ public class UnionReplacementTest extends CompilerTestBase {
 			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
 
 		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
-		// all union inputs should be force rebalanced
+		// all union inputs should be range partitioned
 		for (Channel c : union.getInputs()) {
-			assertEquals("Union input should be force rebalanced",
+			assertEquals("Union input should be range partitioned",
 				PartitioningProperty.RANGE_PARTITIONED, c.getGlobalProperties().getPartitioning());
-			assertEquals("Union input channel should be rebalancing",
+			assertEquals("Union input channel should be forwarded",
 				ShipStrategyType.FORWARD, c.getShipStrategy());
 			// range partitioning is executed as custom partitioning with prior sampling
 			SingleInputPlanNode partitionMap = (SingleInputPlanNode)c.getSource();
@@ -376,4 +375,67 @@ public class UnionReplacementTest extends CompilerTestBase {
 		}
 	}
 
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by broadcast-fwd JOIN is correctly
translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 --> bc-fwd-Join -> Output
+	 * Src3 ----------------/             /
+	 *                                   /
+	 * Src4 ----------------------------/
+	 *
+	 * In the resulting plan, the broadcasting must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithBroadcast() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L,
0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L,
0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L,
0L));
+		DataSet<Tuple2<Long, Long>> src4 = env.fromElements(new Tuple2<>(0L,
0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+		union123.join(src4, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST)
+			.where(0).equalTo(0).name("join")
+			.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long,
Long>>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		DualInputPlanNode join = resolver.getNode("join");
+
+		// check input of join is broadcasted
+		assertEquals("First join input should be fully replicated.",
+			PartitioningProperty.FULL_REPLICATION, join.getInput1().getGlobalProperties().getPartitioning());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)join.getInput1().getSource();
+		// check that all union inputs are broadcasted
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be fully replicated",
+				PartitioningProperty.FULL_REPLICATION, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be broadcasting",
+				ShipStrategyType.BROADCAST, c.getShipStrategy());
+		}
+	}
+
 }


Mime
View raw message