flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [51/63] [abbrv] git commit: Fix bug in topological sort
Date Sun, 21 Sep 2014 02:13:15 GMT
Fix bug in topological sort


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f229d5bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f229d5bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f229d5bc

Branch: refs/heads/master
Commit: f229d5bcb9d877cbac5eec4120bd4d9ed4548c8d
Parents: 8231b62
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Sep 16 19:48:02 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Sep 20 20:02:50 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/jobgraph/JobGraph.java |  4 ++
 .../flink/runtime/jobgraph/JobGraphTest.java    | 39 ++++++++++++++++++++
 2 files changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f229d5bc/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 85978fe..dc6eb04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -280,6 +280,10 @@ public class JobGraph implements IOReadableWritable {
 				
 				// a vertex can be added, if it has no predecessors that are still in the 'remaining'
set
 				AbstractJobVertex v = edge.getTarget();
+				if (!remaining.contains(v)) {
+					continue;
+				}
+				
 				boolean hasNewPredecessors = false;
 				
 				for (JobEdge e : v.getInputs()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f229d5bc/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 768ac82..18b16e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -94,6 +94,8 @@ public class JobGraphTest {
 			JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2,
target1, target2);
 			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
+			assertEquals(6, sorted.size());
+			
 			assertBefore(source1, target1, sorted);
 			assertBefore(source1, target2, sorted);
 			assertBefore(source2, target2, sorted);
@@ -136,6 +138,8 @@ public class JobGraphTest {
 			JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2);
 			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
+			assertEquals(7,  sorted.size());
+			
 			assertBefore(source1, root, sorted);
 			assertBefore(source2, root, sorted);
 			assertBefore(l11, root, sorted);
@@ -159,6 +163,41 @@ public class JobGraphTest {
 	}
 	
 	@Test
+	public void testTopologicalSort3() {
+		//             --> op1 --
+		//            /         \
+		//  (source) -           +-> op2 -> op3
+		//            \         /
+		//             ---------
+		
+		try {
+			AbstractJobVertex source = new AbstractJobVertex("source");
+			AbstractJobVertex op1 = new AbstractJobVertex("op4");
+			AbstractJobVertex op2 = new AbstractJobVertex("op2");
+			AbstractJobVertex op3 = new AbstractJobVertex("op3");
+			
+			op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+			op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE);
+			op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+			op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE);
+			
+			JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
+			List<AbstractJobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
+			
+			assertEquals(4,  sorted.size());
+			
+			assertBefore(source, op1, sorted);
+			assertBefore(source, op2, sorted);
+			assertBefore(op1, op2, sorted);
+			assertBefore(op2, op3, sorted);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
 	public void testTopoSortCyclicGraphNoSources() {
 		try {
 			AbstractJobVertex v1 = new AbstractJobVertex("1");


Mime
View raw message