flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks.
Date Fri, 13 Mar 2015 13:22:04 GMT
Repository: flink
Updated Branches:
  refs/heads/master 81ebe980a -> 07a9a56c1


[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks.

This closes #476


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a9a56c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a9a56c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a9a56c

Branch: refs/heads/master
Commit: 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440
Parents: 81ebe98
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Mar 13 14:20:08 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java | 28 ++++++++++++++++----
 1 file changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07a9a56c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41b78f8..794ca21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable {
 			return Collections.emptySet();
 		}
 		else {
-			HashSet<Instance> locations = new HashSet<Instance>();
-		
+
+			Set<Instance> locations = new HashSet<Instance>();
+			Set<Instance> inputLocations = new HashSet<Instance>();
+
+			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
+				inputLocations.clear();
 				ExecutionEdge[] sources = inputEdges[i];
 				if (sources != null) {
+					// go over all input sources
 					for (int k = 0; k < sources.length; k++) {
+						// look-up assigned slot of input source
 						SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
 						if (sourceSlot != null) {
-							locations.add(sourceSlot.getInstance());
-							if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-								return null;
+							// add input location
+							inputLocations.add(sourceSlot.getInstance());
+							// inputs which have too many distinct sources are not considered
+							if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+								inputLocations.clear();
+								break;
 							}
 						}
 					}
 				}
+				// keep the locations of the input with the least preferred locations
+				if(locations.isEmpty() || // nothing assigned yet
+						(!inputLocations.isEmpty() && inputLocations.size() < locations.size()))
{
+					// current input has fewer preferred locations
+					locations.clear();
+					locations.addAll(inputLocations);
+				}
 			}
+
 			return locations;
 		}
 	}


Mime
View raw message