flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/8] flink git commit: [FLINK-7] Prevent range partitioning inside iterations.
Date Mon, 21 Dec 2015 17:44:06 GMT
[FLINK-7] Prevent range partitioning inside iterations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6a05288
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6a05288
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6a05288

Branch: refs/heads/master
Commit: a6a052884718c52c843ad1ade1b42b1155115a46
Parents: f5957ce
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Dec 16 17:02:14 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Dec 21 16:43:33 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/functions/IdPartitioner.java | 32 ++++++++++++++++
 .../traversals/RangePartitionRewriter.java      | 40 ++++++++++++--------
 .../test/javaApiOperators/PartitionITCase.java  | 30 +++++++++++++++
 3 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6a05288/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
new file mode 100644
index 0000000..a50ded1
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.Partitioner;
+
+public class IdPartitioner implements Partitioner<Integer> {
+
+	private static final long serialVersionUID = -1206233785103357568L;
+
+	@Override
+	public int partition(Integer key, int numPartitions) {
+		return key;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6a05288/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
index 1110f23..7656dfd 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.optimizer.traversals;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.distributions.CommonRangeBoundaries;
-import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -29,9 +29,11 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.java.functions.IdPartitioner;
 import org.apache.flink.optimizer.costs.Costs;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.udf.AssignRangeIndex;
 import org.apache.flink.runtime.operators.udf.RemoveRangeIndex;
@@ -57,7 +59,9 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Visitor;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  *
@@ -76,9 +80,11 @@ public class RangePartitionRewriter implements Visitor<PlanNode>
{
 	final static IdPartitioner idPartitioner = new IdPartitioner();
 
 	final OptimizedPlan plan;
+	final Set<IterationPlanNode> visitedIterationNodes;
 
 	public RangePartitionRewriter(OptimizedPlan plan) {
 		this.plan = plan;
+		this.visitedIterationNodes = new HashSet<>();
 	}
 
 	@Override
@@ -87,12 +93,26 @@ public class RangePartitionRewriter implements Visitor<PlanNode>
{
 	}
 
 	@Override
-	public void postVisit(PlanNode visitable) {
-		final Iterable<Channel> inputChannels = visitable.getInputs();
+	public void postVisit(PlanNode node) {
+
+		if(node instanceof IterationPlanNode) {
+			IterationPlanNode iNode = (IterationPlanNode)node;
+			if(!visitedIterationNodes.contains(iNode)) {
+				visitedIterationNodes.add(iNode);
+				iNode.acceptForStepFunction(this);
+			}
+		}
+
+		final Iterable<Channel> inputChannels = node.getInputs();
 		for (Channel channel : inputChannels) {
 			ShipStrategyType shipStrategy = channel.getShipStrategy();
 			// Make sure we only optimize the DAG for range partition, and do not optimize multi times.
-			if (shipStrategy == ShipStrategyType.PARTITION_RANGE && isOptimized(visitable))
{
+			if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {
+
+				if(node.isOnDynamicPath()) {
+					throw new InvalidProgramException("Range Partitioning not supported within iterations.");
+				}
+
 				PlanNode channelSource = channel.getSource();
 				List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
 				channelSource.getOutgoingChannels().remove(channel);
@@ -214,16 +234,4 @@ public class RangePartitionRewriter implements Visitor<PlanNode>
{
 		return sourceNewOutputChannels;
 	}
 
-
-	private boolean isOptimized(PlanNode node) {
-		return node.getNodeName() != PR_NAME;
-	}
-
-	static class IDPartitioner implements Partitioner<Integer> {
-
-		@Override
-		public int partition(Integer key, int numPartitions) {
-			return key;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6a05288/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index 693dda4..7dc418e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
@@ -33,6 +34,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -451,6 +453,34 @@ public class PartitionITCase extends MultipleProgramsTestBase {
 		}
 	}
 
+	@Test(expected = InvalidProgramException.class)
+	public void testRangePartitionInIteration() throws Exception {
+
+		// does not apply for collection execution
+		if (super.mode == TestExecutionMode.COLLECTION) {
+			throw new InvalidProgramException("Does not apply for collection execution");
+		}
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSource<Long> source = env.generateSequence(0, 10000);
+
+		DataSet<Tuple2<Long, String>> tuples = source.map(new MapFunction<Long,
Tuple2<Long, String>>() {
+			@Override
+			public Tuple2<Long, String> map(Long v) throws Exception {
+				return new Tuple2<>(v, Long.toString(v));
+			}
+		});
+
+		DeltaIteration<Tuple2<Long, String>, Tuple2<Long, String>> it = tuples.iterateDelta(tuples,
10, 0);
+		DataSet<Tuple2<Long, String>> body = it.getWorkset()
+			.partitionByRange(1) // Verify that range partition is not allowed in iteration
+			.join(it.getSolutionSet())
+			.where(0).equalTo(0).projectFirst(0).projectSecond(1);
+		DataSet<Tuple2<Long, String>> result = it.closeWith(body, body);
+
+		result.collect(); // should fail
+	}
+
 	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
 		@Override
 		public Long getKey(Long value) throws Exception {


Mime
View raw message