flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject incubator-flink git commit: [FLINK-1311] [optimizer] Correctly tag static/dynamic path for auxiliary nodes in iterations.
Date Tue, 09 Dec 2014 14:45:13 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master f945e2c9c -> 94c8e3fa9


[FLINK-1311] [optimizer] Correctly tag static/dynamic path for auxiliary nodes in iterations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/94c8e3fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/94c8e3fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/94c8e3fa

Branch: refs/heads/master
Commit: 94c8e3fa9086d847aac0cd75fddbc3b5a797b474
Parents: f945e2c
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Dec 9 11:59:51 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Dec 9 12:05:21 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |  8 +---
 .../compiler/java/IterationCompilerTest.java    | 45 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94c8e3fa/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index bec264d..a63cfd1 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -837,10 +837,7 @@ public class PactCompiler {
 				
 				// go over the contained data flow and mark the dynamic path nodes
 				StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-				rootOfStepFunction.accept(identifier);
-				if(terminationCriterion != null){
-					terminationCriterion.accept(identifier);
-				}
+				iterNode.acceptForStepFunction(identifier);
 			}
 			else if (n instanceof WorksetIterationNode) {
 				final WorksetIterationNode iterNode = (WorksetIterationNode) n;
@@ -919,8 +916,7 @@ public class PactCompiler {
 				
 				// go over the contained data flow and mark the dynamic path nodes
 				StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-				nextWorksetNode.accept(pathIdentifier);
-				iterNode.getSolutionSetDelta().accept(pathIdentifier);
+				iterNode.acceptForStepFunction(pathIdentifier);
 			}
 		}
 	};

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94c8e3fa/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
index 36e5739..5f1b760 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/IterationCompilerTest.java
@@ -28,7 +28,12 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.NAryUnionPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.junit.Test;
@@ -99,6 +104,22 @@ public class IterationCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterNode = (BulkIterationPlanNode) sink.getInput().getSource();
+			
+			// make sure that the root is part of the dynamic path
+			
+			// the "NoOp" that comes after the union.
+			SingleInputPlanNode noop = (SingleInputPlanNode) iterNode.getRootOfStepFunction();
+			NAryUnionPlanNode union = (NAryUnionPlanNode) noop.getInput().getSource();
+			
+			assertTrue(noop.isOnDynamicPath());
+			assertTrue(noop.getCostWeight() >= 1);
+			
+			assertTrue(union.isOnDynamicPath());
+			assertTrue(union.getCostWeight() >= 1);
+			
+			// see that the jobgraph generator can translate this
 			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
@@ -134,6 +155,30 @@ public class IterationCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) sink.getInput().getSource();
+			
+			// make sure that the root is part of the dynamic path
+			
+			// the "NoOp"a that come after the union.
+			SingleInputPlanNode nextWorksetNoop = (SingleInputPlanNode) iterNode.getNextWorkSetPlanNode();
+			SingleInputPlanNode solutionDeltaNoop = (SingleInputPlanNode) iterNode.getSolutionSetDeltaPlanNode();
+			
+			NAryUnionPlanNode nextWorksetUnion = (NAryUnionPlanNode) nextWorksetNoop.getInput().getSource();
+			NAryUnionPlanNode solutionDeltaUnion = (NAryUnionPlanNode) solutionDeltaNoop.getInput().getSource();
+			
+			assertTrue(nextWorksetNoop.isOnDynamicPath());
+			assertTrue(nextWorksetNoop.getCostWeight() >= 1);
+			
+			assertTrue(solutionDeltaNoop.isOnDynamicPath());
+			assertTrue(solutionDeltaNoop.getCostWeight() >= 1);
+			
+			assertTrue(nextWorksetUnion.isOnDynamicPath());
+			assertTrue(nextWorksetUnion.getCostWeight() >= 1);
+			
+			assertTrue(solutionDeltaUnion.isOnDynamicPath());
+			assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
+			
 			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {


Mime
View raw message