flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [78/92] [abbrv] git commit: [FLINK-1018] Fix cross pipelining/daming info to resolve cross-related streaming deadlocks.
Date Tue, 22 Jul 2014 10:41:18 GMT
[FLINK-1018] Fix cross pipelining/daming info to resolve cross-related streaming deadlocks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3002258f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3002258f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3002258f

Branch: refs/heads/travis_test
Commit: 3002258f8a22a8adbdb230e57c972ad17910debf
Parents: ec0b00d
Author: Stephan Ewen <sewen@apache.org>
Authored: Sat Jul 12 15:57:22 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Jul 12 19:31:26 2014 +0200

----------------------------------------------------------------------
 .../flink/compiler/PipelineBreakerTest.java     | 103 ++++++++++++++++++-
 .../flink/runtime/operators/DriverStrategy.java |   8 +-
 2 files changed, 106 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3002258f/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
index 4e43a74..45bf729 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
@@ -23,12 +23,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
-import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+import org.apache.flink.configuration.Configuration;
 
 @SuppressWarnings("serial")
 public class PipelineBreakerTest extends CompilerTestBase {
@@ -134,4 +135,104 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testPilelineBreakerWithCross() {
+		try {
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3002258f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 3bf6c01..5f00277 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -80,13 +80,13 @@ public enum DriverStrategy {
 	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING,
FULL_DAM, true),
 	
 	// the second input is inner loop, the first input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
+	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, false),
 	// the first input is inner loop, the second input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
+	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, false),
 	// the second input is inner loop, the first input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, MATERIALIZING, false),
+	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, false),
 	// the first input is inner loop, the second input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false),
+	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, false),
 	
 	// union utility op. unions happen implicitly on the network layer (in the readers) when
bundeling streams
 	UNION(null, null, FULL_DAM, FULL_DAM, false);


Mime
View raw message