flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [09/11] git commit: [FLINK-1143] Allow delta iterations that do not join with the solution set
Date Fri, 17 Oct 2014 14:52:36 GMT
[FLINK-1143] Allow delta iterations that do not join with the solution set


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9310703d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9310703d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9310703d

Branch: refs/heads/release-0.7
Commit: 9310703d3cfae47965b5275da0e665a745fd9eba
Parents: 1abadb1
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Oct 13 21:21:54 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Oct 17 16:48:55 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java | 10 ++-
 .../flink/compiler/dag/SolutionSetNode.java     |  6 +-
 .../apache/flink/compiler/dag/TwoInputNode.java |  3 +-
 .../WorksetIterationCornerCasesTest.java        | 77 ++++++++++++++++++
 .../WorksetIterationsJavaApiCompilerTest.java   |  4 +-
 ...terationNotDependingOnSolutionSetITCase.java | 84 ++++++++++++++++++++
 6 files changed, 171 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 6492c5c..177622b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -855,18 +855,20 @@ public class PactCompiler {
 				// and the solution set. If it does depend on both, this descend should create both nodes
 				iter.getSolutionSetDelta().accept(recursiveCreator);
 				
-				final SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
 				final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
 				
 				if (worksetNode == null) {
-					throw new CompilerException("In the given plan, the solution set delta does not depend
on the workset. This is a prerequisite in workset iterations.");
+					throw new CompilerException("In the given plan, the solution set delta does not depend
on the workset. This is a prerequisite in delta iterations.");
 				}
 				
 				iter.getNextWorkset().accept(recursiveCreator);
 				
+				SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
+				
 				if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty())
{
-					throw new CompilerException("Error: The step function does not reference the solution
set.");
-				} else {
+					solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(),
iterNode);
+				}
+				else {
 					for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
 						OptimizerNode successor = conn.getTarget();
 					

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
index 523525d..ab95f21 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SolutionSetNode.java
@@ -46,11 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 	// --------------------------------------------------------------------------------------------
 	
 	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel
initialInput) {
-		if (this.cachedPlans != null) {
-			throw new IllegalStateException();
-		} else {
-			this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this,
"SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
-		}
+		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this,
"SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
 	}
 	
 	public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index 239ca08..ce9214e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -492,8 +492,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 								instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1,
rgps2, ilp1, ilp2);
 								break;
 							} else {
-								// meet, but not co-compatible
-//								throw new CompilerException("Implements to adjust one side to the other!");
+								// cannot use this pair, fall through the loop and try the next one
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
new file mode 100644
index 0000000..dfff3e1
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationCornerCasesTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class WorksetIterationCornerCasesTest extends CompilerTestBase {
+
+	@Test
+	public void testWorksetIterationNotDependingOnSolutionSet() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
100, 1);
+			
+			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
+			iteration.closeWith(iterEnd, iterEnd).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+			assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
+			
+			NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+			jgg.compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T,
T>> {
+		@Override
+		public Tuple2<T, T> map(T value) {
+			return new Tuple2<T, T>(value, value);
+		}
+	}
+	
+	private static final class TestMapper<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) {
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index ef756d0..048f712 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -102,7 +102,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase
{
 	}
 	
 	@Test
-	public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
+	public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
 		try {
 			Plan plan = getJavaTestPlan(false, false);
 			
@@ -146,7 +146,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase
{
 	}
 	
 	@Test
-	public void testRecordApiWithDirectSoltionSetUpdate() {
+	public void testJavaApiWithDirectSoltionSetUpdate() {
 		try {
 			Plan plan = getJavaTestPlan(true, false);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9310703d/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
new file mode 100644
index 0000000..1701628
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaIterationNotDependingOnSolutionSetITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DeltaIterationNotDependingOnSolutionSetITCase {
+
+	@Test
+	public void testDeltaIterationNotDependingOnSolutionSet() {
+		try {
+			final List<Tuple2<Long, Long>> result = new ArrayList<Tuple2<Long,Long>>();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(1);
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(0, 9).map(new Duplicator<Long>());
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input,
5, 1);
+			
+			iteration.closeWith(iteration.getWorkset(), iteration.getWorkset().map(new TestMapper()))
+				.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(result));
+			
+			env.execute();
+			
+			boolean[] present = new boolean[50];
+			for (Tuple2<Long, Long> t : result) {
+				present[t.f0.intValue()] = true;
+			}
+			
+			for (int i = 0; i < present.length; i++) {
+				assertTrue(String.format("Missing tuple (%d, %d)", i, i), present[i]);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T,
T>> {
+		@Override
+		public Tuple2<T, T> map(T value) {
+			return new Tuple2<T, T>(value, value);
+		}
+	}
+	
+	private static final class TestMapper extends RichMapFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+			return new Tuple2<Long, Long>(value.f0+10, value.f1+10);
+		}
+	}
+}


Mime
View raw message