Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41AD517686 for ; Fri, 13 Mar 2015 13:22:04 +0000 (UTC) Received: (qmail 70781 invoked by uid 500); 13 Mar 2015 13:22:04 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 70750 invoked by uid 500); 13 Mar 2015 13:22:04 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 70734 invoked by uid 99); 13 Mar 2015 13:22:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2015 13:22:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0BD06E0B2D; Fri, 13 Mar 2015 13:22:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: fhueske@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?flink_git_commit=3A_=5BFLINK-1683=5D_=5Bjobmanager=5D?= =?utf-8?q?=C2=A0Fix_scheduling_preference_choice_for_non-unary_execution_ta?= =?utf-8?q?sks=2E?= Date: Fri, 13 Mar 2015 13:22:04 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 81ebe980a -> 07a9a56c1 [FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks. This closes #476 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a9a56c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a9a56c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a9a56c Branch: refs/heads/master Commit: 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440 Parents: 81ebe98 Author: Fabian Hueske Authored: Tue Mar 10 19:19:27 2015 +0100 Committer: Fabian Hueske Committed: Fri Mar 13 14:20:08 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 28 ++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/07a9a56c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 41b78f8..794ca21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable { return Collections.emptySet(); } else { - HashSet locations = new HashSet(); - + + Set locations = new HashSet(); + Set inputLocations = new HashSet(); + + // go over all inputs for (int i = 0; i < inputEdges.length; i++) { + inputLocations.clear(); ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { + // go over all input sources for (int k = 0; k < sources.length; k++) { + // look-up assigned slot of input source SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); if (sourceSlot != null) { - locations.add(sourceSlot.getInstance()); - if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return null; + // add input location + inputLocations.add(sourceSlot.getInstance()); + // inputs which have too many distinct sources are not considered + if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + inputLocations.clear(); + break; } } } } + // keep the locations of the input with the least preferred locations + if(locations.isEmpty() || // nothing assigned yet + (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { + // current input has fewer preferred locations + locations.clear(); + locations.addAll(inputLocations); + } } + return locations; } }