flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject git commit: [FLINK-1210] Improve error message in delta iterations when the next workset does not depend on the workset
Date Tue, 04 Nov 2014 15:43:30 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 9b6561d4e -> 233161b25


[FLINK-1210] Improve error message in delta iterations when the next workset does not depend
on the workset


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/233161b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/233161b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/233161b2

Branch: refs/heads/master
Commit: 233161b25eecebb94301f4a1ff06de94940b805b
Parents: 9b6561d
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Nov 4 15:33:31 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Nov 4 16:42:41 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java | 35 ++++++++-
 .../java/DeltaIterationDependenciesTest.java    | 76 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/233161b2/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index d8792c0..15aac32 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -846,12 +846,22 @@ public class PactCompiler {
 				final WorksetIterationNode iterNode = (WorksetIterationNode) n;
 				final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
 
+				// we need to ensure that both the next-workset and the solution-set-delta depend on
the workset. One check is for free
+				// during the translation, we do the other check here as a pre-condition
+				{
+					WorksetFinder wsf = new WorksetFinder();
+					iter.getNextWorkset().accept(wsf);
+					if (!wsf.foundWorkset) {
+						throw new CompilerException("In the given program, the next workset does not depend
on the workset. This is a prerequisite in delta iterations.");
+					}
+				}
+				
 				// calculate the closure of the anonymous function
 				HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>,
OptimizerNode>(con2node);
 
 				// first, recursively build the data flow for the step function
-				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
-					iterNode.getDegreeOfParallelism(), closure);
+				final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, iterNode.getDegreeOfParallelism(),
closure);
+				
 				// descend from the solution set delta. check that it depends on both the workset
 				// and the solution set. If it does depend on both, this descend should create both nodes
 				iter.getSolutionSetDelta().accept(recursiveCreator);
@@ -859,7 +869,7 @@ public class PactCompiler {
 				final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
 				
 				if (worksetNode == null) {
-					throw new CompilerException("In the given plan, the solution set delta does not depend
on the workset. This is a prerequisite in delta iterations.");
+					throw new CompilerException("In the given program, the solution set delta does not depend
on the workset. This is a prerequisite in delta iterations.");
 				}
 				
 				iter.getNextWorkset().accept(recursiveCreator);
@@ -1284,6 +1294,25 @@ public class PactCompiler {
 			}
 		}
 	}
+	
+	private static final class WorksetFinder implements Visitor<Operator<?>> {
+
+		private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
+		
+		private boolean foundWorkset;
+		
+		@Override
+		public boolean preVisit(Operator<?> visitable) {
+			if (visitable instanceof WorksetPlaceHolder) {
+				foundWorkset = true;
+			}
+			
+			return (!foundWorkset) && seenBefore.add(visitable);
+		}
+
+		@Override
+		public void postVisit(Operator<?> visitable) {}
+	}
 
 	// ------------------------------------------------------------------------
 	// Miscellaneous

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/233161b2/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
new file mode 100644
index 0000000..2ef016b
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.CompilerTestBase;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class DeltaIterationDependenciesTest extends CompilerTestBase {
+
+	@Test
+	public void testExceptionWhenNewWorksetNotDependentOnWorkset() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,
0L));
+
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIteration
= input.iterateDelta(input, 10,0);
+
+			DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
+														.where(0).equalTo(0)
+														.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
+
+			DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input)
+														.where(0).equalTo(0)
+														.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
+			
+
+			DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
+
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("Should not be able to compile, since the next workset does not depend on the workset");
+			}
+			catch (CompilerException e) {
+				// good
+			}
+			catch (Exception e) {
+				fail("wrong exception type");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message