flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [18/22] git commit: Streamlined job graph algorithms to get rid off linear contains operations.
Date Sun, 22 Jun 2014 21:47:39 GMT
Streamlined job graph algorithms to get rid off linear contains operations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e52fcf90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e52fcf90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e52fcf90

Branch: refs/heads/master
Commit: e52fcf90c37f921f50cd75dfcb7960d2f37c5e74
Parents: fba44a9
Author: Till Rohrmann <till.rohrmann@gmail.com>
Authored: Wed Apr 9 16:18:44 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200

----------------------------------------------------------------------
 .../stratosphere/nephele/jobgraph/JobGraph.java | 120 +++++++------------
 1 file changed, 45 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e52fcf90/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
index 804a258..f048b0d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
@@ -92,6 +92,11 @@ public class JobGraph implements IOReadableWritable {
 	private static final int BUFFERSIZE = 8192;
 
 	/**
+	 * Buffer for array of reachable job vertices
+	 */
+	private volatile AbstractJobVertex[] bufferedAllReachableJobVertices = null;
+
+	/**
 	 * Constructs a new job graph with a random job ID.
 	 */
 	public JobGraph() {
@@ -253,14 +258,51 @@ public class JobGraph implements IOReadableWritable {
 
 	/**
 	 * Returns an array of all job vertices than can be reached when traversing the job graph
from the input vertices.
+	 * Each job vertex is contained only one time.
 	 * 
 	 * @return an array of all job vertices than can be reached when traversing the job graph
from the input vertices
 	 */
 	public AbstractJobVertex[] getAllReachableJobVertices() {
+		if(bufferedAllReachableJobVertices == null){
+			final List<AbstractJobVertex> collector = new ArrayList<AbstractJobVertex>();
+			final HashSet<JobVertexID> visited = new HashSet<JobVertexID>();
+
+			final Iterator<AbstractJobInputVertex> inputs = getInputVertices();
+
+			while(inputs.hasNext()){
+				AbstractJobVertex vertex = inputs.next();
+
+				if(!visited.contains(vertex.getID())){
+					collectVertices(vertex, visited, collector);
+				}
+			}
+
+			bufferedAllReachableJobVertices = collector.toArray(new AbstractJobVertex[0]);
+		}
+
+		return bufferedAllReachableJobVertices;
+	}
+
+	/**
+	 * Auxiliary method to collect all vertices which are reachable from the input vertices.
+	 *
+	 * @param jv
+	 *        the currently considered job vertex
+	 * @param collector
+	 *        a temporary list to store the vertices that have already been visisted
+	 */
+	private void collectVertices(final AbstractJobVertex jv, final HashSet<JobVertexID>
visited, final
+			List<AbstractJobVertex> collector) {
+		visited.add(jv.getID());
+		collector.add(jv);
 
-		final Vector<AbstractJobVertex> collector = new Vector<AbstractJobVertex>();
-		collectVertices(null, collector);
-		return collector.toArray(new AbstractJobVertex[0]);
+		for(int i =0; i < jv.getNumberOfForwardConnections(); i++){
+			AbstractJobVertex vertex = jv.getForwardConnection(i).getConnectedVertex();
+
+			if(!visited.contains(vertex.getID())){
+				collectVertices(vertex, visited, collector);
+			}
+		}
 	}
 
 	/**
@@ -293,34 +335,6 @@ public class JobGraph implements IOReadableWritable {
 		return vertices;
 	}
 
-	/**
-	 * Auxiliary method to collect all vertices which are reachable from the input vertices.
-	 * 
-	 * @param jv
-	 *        the currently considered job vertex
-	 * @param collector
-	 *        a temporary list to store the vertices that have already been visisted
-	 */
-	private void collectVertices(final AbstractJobVertex jv, final List<AbstractJobVertex>
collector) {
-
-		if (jv == null) {
-			final Iterator<AbstractJobInputVertex> iter = getInputVertices();
-			while (iter.hasNext()) {
-				collectVertices(iter.next(), collector);
-			}
-		} else {
-
-			if (!collector.contains(jv)) {
-				collector.add(jv);
-			} else {
-				return;
-			}
-
-			for (int i = 0; i < jv.getNumberOfForwardConnections(); i++) {
-				collectVertices(jv.getForwardConnection(i).getConnectedVertex(), collector);
-			}
-		}
-	}
 
 	/**
 	 * Returns the ID of the job.
@@ -356,31 +370,6 @@ public class JobGraph implements IOReadableWritable {
 	}
 
 	/**
-	 * Checks if the job vertex with the given ID is registered with the job graph.
-	 * 
-	 * @param id
-	 *        the ID of the vertex to search for
-	 * @return <code>true</code> if a vertex with the given ID is registered with
the job graph, <code>false</code>
-	 *         otherwise.
-	 */
-	private boolean includedInJobGraph(final JobVertexID id) {
-
-		if (this.inputVertices.containsKey(id)) {
-			return true;
-		}
-
-		if (this.outputVertices.containsKey(id)) {
-			return true;
-		}
-
-		if (this.taskVertices.containsKey(id)) {
-			return true;
-		}
-
-		return false;
-	}
-
-	/**
 	 * Checks if the job graph is weakly connected.
 	 * 
 	 * @return <code>true</code> if the job graph is weakly connected, otherwise
<code>false</code>
@@ -395,25 +384,6 @@ public class JobGraph implements IOReadableWritable {
 			return false;
 		}
 
-		final HashMap<JobVertexID, AbstractJobVertex> tmp = new HashMap<JobVertexID, AbstractJobVertex>();
-		for (int i = 0; i < reachable.length; i++) {
-			tmp.put(reachable[i].getID(), reachable[i]);
-		}
-
-		// Check if all is subset of reachable
-		for (int i = 0; i < all.length; i++) {
-			if (!tmp.containsKey(all[i].getID())) {
-				return false;
-			}
-		}
-
-		// Check if reachable is a subset of all
-		for (int i = 0; i < reachable.length; i++) {
-			if (!includedInJobGraph(reachable[i].getID())) {
-				return false;
-			}
-		}
-
 		return true;
 	}
 


Mime
View raw message