flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] git commit: [FLINK-1164] Gracefully handle empty (identity) iterations
Date Sat, 18 Oct 2014 17:45:30 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master a0ad90314 -> 259f10c09


[FLINK-1164] Gracefully handle empty (identity) iterations


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/867e3a57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/867e3a57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/867e3a57

Branch: refs/heads/master
Commit: 867e3a57eb994b704fdabf791eec6123ae165883
Parents: a0ad903
Author: Stephan Ewen <sewen@apache.org>
Authored: Sat Oct 18 01:55:08 2014 +0200
Committer: Robert Metzger <metzgerr@web.de>
Committed: Sat Oct 18 19:35:09 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |  5 +-
 .../flink/compiler/dag/BulkIterationNode.java   |  4 +-
 .../compiler/dag/WorksetIterationNode.java      | 15 +++-
 .../plantranslate/NepheleJobGraphGenerator.java | 10 +++
 .../compiler/java/IterationCompilerTest.java    | 88 ++++++++++++++++++++
 .../iterative/EmptyWorksetIterationITCase.java  | 59 +++++++++++++
 .../test/iterative/IdentityIterationITCase.java | 56 +++++++++++++
 7 files changed, 233 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 177622b..d8792c0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -801,7 +801,8 @@ public class PactCompiler {
 				final BulkIterationNode iterNode = (BulkIterationNode) n;
 				final BulkIterationBase<?> iter = iterNode.getIterationContract();
 
-				// calculate closure of the anonymous function
+				// pass a copy of the no iterative part into the iteration translation,
+				// in case the iteration references its closure
 				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>,
OptimizerNode>(con2node);
 
 				// first, recursively build the data flow for the step function
@@ -831,8 +832,8 @@ public class PactCompiler {
 					}
 				}
 				
-				iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
 				iterNode.setPartialSolution(partialSolution);
+				iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
 				
 				// go over the contained data flow and mark the dynamic path nodes
 				StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index 91e9eda..a5f8026 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -130,7 +130,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion)
{
 		
 		// check if the root of the step function has the same DOP as the iteration
-		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism())
+		// or if the steo function has any operator at all
+		if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
+			nextPartialSolution == partialSolution)
 		{
 			// add a no-op to the root to express the re-partitioning
 			NoOpNode noop = new NoOpNode();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index 2cd8294..7638cca 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -158,6 +158,19 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 			}
 		}
 		
+		// there needs to be at least one node in the workset path, so
+		// if the next workset is equal to the workset, we need to inject a no-op node
+		if (nextWorkset == worksetNode) {
+			NoOpNode noop = new NoOpNode();
+			noop.setDegreeOfParallelism(getDegreeOfParallelism());
+
+			PactConnection noOpConn = new PactConnection(nextWorkset, noop);
+			noop.setIncomingConnection(noOpConn);
+			nextWorkset.addOutgoingConnection(noOpConn);
+			
+			nextWorkset = noop;
+		}
+		
 		// attach an extra node to the solution set delta for the cases where we need to repartition
 		UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta",
getSolutionSetKeyFields(),
 				new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
@@ -367,7 +380,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 			return;
 		}
 		
-		// sanity check the solution set delta and cancel out the delta node, if it is not needed
+		// sanity check the solution set delta
 		for (Iterator<PlanNode> deltaPlans = solutionSetDeltaCandidates.iterator(); deltaPlans.hasNext();
) {
 			SingleInputPlanNode candidate = (SingleInputPlanNode) deltaPlans.next();
 			GlobalProperties gp = candidate.getGlobalProperties();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index dbce56b..3dd9685 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1129,6 +1129,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		// ------------ finalize the head config with the final outputs and the sync gate ------------
 		final int numStepFunctionOuts = headConfig.getNumOutputs();
 		final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+		
+		if (numStepFunctionOuts == 0) {
+			throw new CompilerException("The iteration has no operation inside the step function.");
+		}
+		
 		headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
 		headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
 		final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask();
@@ -1250,6 +1255,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		{
 			final int numStepFunctionOuts = headConfig.getNumOutputs();
 			final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+			
+			if (numStepFunctionOuts == 0) {
+				throw new CompilerException("The workset iteration has no operation on the workset inside
the step function.");
+			}
+			
 			headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
 			headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
 			final double relativeMemory = iterNode.getRelativeMemoryPerSubTask();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
new file mode 100644
index 0000000..c3fbbf2
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class IterationCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testIdentityIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
+			iteration.closeWith(iteration).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIdentityWorksetIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value){ return null; }
+					});
+					
+					
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input,
100, 0);
+			iter.closeWith(iter.getWorkset(), iter.getWorkset())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
new file mode 100644
index 0000000..3f02064
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/EmptyWorksetIterationITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class EmptyWorksetIterationITCase extends JavaProgramTestBase {
+	
+	private List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long, Long>>();
+	
+	@Override
+	protected void testProgram() throws Exception {
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20).map(new Dupl());
+				
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input,
20, 0);
+		iter.closeWith(iter.getWorkset(), iter.getWorkset())
+			.output(new LocalCollectionOutputFormat<Tuple2<Long, Long>>(result));
+		
+		env.execute();
+	}
+
+	public static final class Dupl implements MapFunction<Long, Tuple2<Long, Long>>
{
+
+		@Override
+		public Tuple2<Long, Long> map(Long value) {
+			return new Tuple2<Long, Long>(value, value);
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/867e3a57/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
new file mode 100644
index 0000000..c422aa4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IdentityIterationITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IdentityIterationITCase extends JavaProgramTestBase {
+
+	private List<Long> result = new ArrayList<Long>();
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		IterativeDataSet<Long> iteration = env.generateSequence(1, 10).iterate(100);
+		iteration.closeWith(iteration)
+			.output(new LocalCollectionOutputFormat<Long>(result));
+		
+		env.execute();
+	}
+	
+	@Override
+	protected void postSubmit()  {
+		assertEquals(10, result.size());
+		
+		long sum = 0;
+		for (Long l : result) {
+			sum += l;
+		}
+		assertEquals(55, sum);
+	}
+}


Mime
View raw message