flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [13/53] [abbrv] git commit: Replaced Tarjan's algorithm with a simpler depth-first traversal cycle detection algorithm. By doing so, one gets rid off a possible linear time check whether a node is contained in the stack.
Date Thu, 26 Jun 2014 09:46:38 GMT
Replaced Tarjan's algorithm with a simpler depth-first traversal cycle detection algorithm.
By doing so, one gets rid off a possible linear time check whether a node is contained in
the stack.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fba44a94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fba44a94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fba44a94

Branch: refs/heads/travis_test
Commit: fba44a94b9e3dd7f87c391278f17ae55393dd51b
Parents: 93bc0b9
Author: Till Rohrmann <till.rohrmann@gmail.com>
Authored: Wed Apr 9 14:32:55 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200

----------------------------------------------------------------------
 .../stratosphere/nephele/jobgraph/JobGraph.java | 76 ++++++++------------
 1 file changed, 28 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fba44a94/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
index 41fd907..804a258 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
@@ -424,18 +424,14 @@ public class JobGraph implements IOReadableWritable {
 	 */
 	public boolean isAcyclic() {
 
-		// Tarjan's algorithm to detect strongly connected componenent of a graph
 		final AbstractJobVertex[] reachable = getAllReachableJobVertices();
-		final HashMap<AbstractJobVertex, Integer> indexMap = new HashMap<AbstractJobVertex,
Integer>();
-		final HashMap<AbstractJobVertex, Integer> lowLinkMap = new HashMap<AbstractJobVertex,
Integer>();
-		final Stack<AbstractJobVertex> stack = new Stack<AbstractJobVertex>();
-		final Integer index = Integer.valueOf(0);
 
-		for (int i = 0; i < reachable.length; i++) {
-			if (!indexMap.containsKey(reachable[i])) {
-				if (!tarjan(reachable[i], index, indexMap, lowLinkMap, stack)) {
-					return false;
-				}
+		final HashSet<JobVertexID> temporarilyMarked = new HashSet<JobVertexID>();
+		final HashSet<JobVertexID> permanentlyMarked = new HashSet<JobVertexID>();
+
+		for(int i = 0; i < reachable.length; i++){
+			if(detectCycle(reachable[i], temporarilyMarked, permanentlyMarked)){
+				return false;
 			}
 		}
 
@@ -443,51 +439,35 @@ public class JobGraph implements IOReadableWritable {
 	}
 
 	/**
-	 * Auxiliary method implementing Tarjan's algorithm for strongly-connected components to
determine whether the job
-	 * graph is acyclic.
+	 * Auxiliary method for cycle detection. Performs a depth-first traversal with vertex markings
to detect a cycle.
+	 * If a node with a temporary marking is found, then there is a cycle. Once all children
of a vertex have been
+	 * traversed the parent node cannot be part of another cycle and is thus permanently marked.
+	 *
+	 * @param jv current job vertex to check
+	 * @param temporarilyMarked set of temporarily marked nodes
+	 * @param permanentlyMarked set of permanently marked nodes
+	 * @return <code>true</code> if there is a cycle, <code>false</code>
otherwise
 	 */
-	private boolean tarjan(final AbstractJobVertex jv, Integer index,
-			final HashMap<AbstractJobVertex, Integer> indexMap, final HashMap<AbstractJobVertex,
Integer> lowLinkMap,
-			final Stack<AbstractJobVertex> stack) {
-
-		indexMap.put(jv, Integer.valueOf(index));
-		lowLinkMap.put(jv, Integer.valueOf(index));
-		index = Integer.valueOf(index.intValue() + 1);
-		stack.push(jv);
-
-		for (int i = 0; i < jv.getNumberOfForwardConnections(); i++) {
+	private boolean detectCycle(final AbstractJobVertex jv, final HashSet<JobVertexID>
temporarilyMarked,
+								final HashSet<JobVertexID> permanentlyMarked){
+		JobVertexID vertexID = jv.getID();
 
-			final AbstractJobVertex jv2 = jv.getForwardConnection(i).getConnectedVertex();
-			if (!indexMap.containsKey(jv2) || stack.contains(jv2)) {
-				if (!indexMap.containsKey(jv2)) {
-					if (!tarjan(jv2, index, indexMap, lowLinkMap, stack)) {
-						return false;
-					}
-				}
-				if (lowLinkMap.get(jv) > lowLinkMap.get(jv2)) {
-					lowLinkMap.put(jv, Integer.valueOf(lowLinkMap.get(jv2)));
-				}
-			}
-		}
-
-		if (lowLinkMap.get(jv).equals(indexMap.get(jv))) {
+		if(permanentlyMarked.contains(vertexID)){
+			return false;
+		}else if(temporarilyMarked.contains(vertexID)){
+			return true;
+		}else{
+			temporarilyMarked.add(vertexID);
 
-			int count = 0;
-			while (stack.size() > 0) {
-				final AbstractJobVertex jv2 = stack.pop();
-				if (jv == jv2) {
-					break;
+			for(int i = 0; i < jv.getNumberOfForwardConnections(); i++){
+				if(detectCycle(jv.getForwardConnection(i).getConnectedVertex(), temporarilyMarked, permanentlyMarked)){
+					return true;
 				}
-
-				count++;
 			}
 
-			if (count > 0) {
-				return false;
-			}
+			permanentlyMarked.add(vertexID);
+			return false;
 		}
-
-		return true;
 	}
 
 	/**


Mime
View raw message