Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2061200CEB for ; Sat, 29 Jul 2017 00:58:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E07B116DB39; Fri, 28 Jul 2017 22:58:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 83EDF16DB35 for ; Sat, 29 Jul 2017 00:58:11 +0200 (CEST) Received: (qmail 45169 invoked by uid 500); 28 Jul 2017 22:58:00 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44045 invoked by uid 99); 28 Jul 2017 22:57:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Jul 2017 22:57:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B229EF3343; Fri, 28 Jul 2017 22:57:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Fri, 28 Jul 2017 22:58:36 -0000 Message-Id: <932ab4c3ae794d5fba879282dc2770d4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] hadoop git commit: YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru). archived-at: Fri, 28 Jul 2017 22:58:13 -0000 YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru). Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9e002563 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9e002563 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9e002563 Branch: refs/heads/YARN-2915 Commit: 9e00256300d43d00ed1d1170fe4fc2d382ac7661 Parents: 9ca0086 Author: Subru Krishnan Authored: Wed Jun 21 19:08:47 2017 -0700 Committer: Subru Krishnan Committed: Fri Jul 28 15:45:05 2017 -0700 ---------------------------------------------------------------------- .../federation/policies/RouterPolicyFacade.java | 15 +++++-- .../policies/router/FederationRouterPolicy.java | 18 +++++--- .../policies/router/HashBasedRouterPolicy.java | 22 ++++++++-- .../policies/router/LoadBasedRouterPolicy.java | 7 +++- .../policies/router/PriorityRouterPolicy.java | 7 +++- .../policies/router/RejectRouterPolicy.java | 26 ++++++++---- .../router/UniformRandomRouterPolicy.java | 23 +++++++++-- .../router/WeightedRandomRouterPolicy.java | 11 ++++- .../policies/BaseFederationPoliciesTest.java | 2 +- .../policies/TestRouterPolicyFacade.java | 12 +++--- .../policies/router/BaseRouterPoliciesTest.java | 43 +++++++++++++++++++- .../router/TestHashBasedRouterPolicy.java | 2 +- .../router/TestLoadBasedRouterPolicy.java | 2 +- .../router/TestPriorityRouterPolicy.java | 2 +- .../policies/router/TestRejectRouterPolicy.java | 4 +- .../router/TestUniformRandomRouterPolicy.java | 2 +- .../router/TestWeightedRandomRouterPolicy.java | 2 +- 17 files changed, 157 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 5e31a08..44c1b10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -110,16 +111,22 @@ public class RouterPolicyFacade { * This method provides a wrapper of all policy functionalities for routing . * Internally it manages configuration changes, and policy init/reinit. * - * @param appSubmissionContext the application to route. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. * - * @return the id of the subcluster that will be the "home" for this + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return the {@link SubClusterId} that will be the "home" for this * application. * * @throws YarnException if there are issues initializing policies, or no * valid sub-cluster id could be found for this app. */ public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // the maps are concurrent, but we need to protect from reset() // reinitialization mid-execution by creating a new reference local to this @@ -186,7 +193,7 @@ public class RouterPolicyFacade { + "and no default specified."); } - return policy.getHomeSubcluster(appSubmissionContext); + return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index 90ea0a8..9325bd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; @@ -29,16 +31,22 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; public interface FederationRouterPolicy extends ConfigurableFederationPolicy { /** - * Determines the sub-cluster that the user application submision should be + * Determines the sub-cluster that the user application submission should be * routed to. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. * - * @return the sub-cluster as identified by {@link SubClusterId} to route the - * request to. + * @return the {@link SubClusterId} that will be the "home" for this + * application. * * @throws YarnException if the policy cannot determine a viable subcluster. */ SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException; + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java index e40e87e..257a9fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -55,19 +55,35 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy { * sub-cluster, as far as the number of active sub-cluster and their names * remain the same. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. * - * @return a hash-based chosen subcluster. + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return a hash-based chosen {@link SubClusterId} that will be the "home" + * for this application. * * @throws YarnException if there are no active subclusters. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // throws if no active subclusters available Map activeSubclusters = getActiveSubclusters(); + if (blackListSubClusters != null) { + + // Remove from the active SubClusters from StateStore the blacklisted ones + for (SubClusterId scId : blackListSubClusters) { + activeSubclusters.remove(scId); + } + } + validate(appSubmissionContext); int chosenPosition = Math.abs( http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index 2ca15bf..c124001 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -62,7 +63,8 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -76,6 +78,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy { long currBestMem = -1; for (Map.Entry entry : activeSubclusters .entrySet()) { + if (blacklist != null && blacklist.contains(entry.getKey())) { + continue; + } SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); if (weights.containsKey(id) && weights.get(id) > 0) { long availableMemory = getAvailableMemory(entry.getValue()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index 13d9140..59f8767 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -50,6 +52,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { Float currentBest = Float.MIN_VALUE; for (SubClusterId id : activeSubclusters.keySet()) { SubClusterIdInfo idInfo = new SubClusterIdInfo(id); + if (blacklist != null && blacklist.contains(id)) { + continue; + } if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) { currentBest = weights.get(idInfo); chosen = id; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java index faf3279..b4c0192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -27,8 +29,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; /** * This {@link FederationRouterPolicy} simply rejects all incoming requests. - * This is useful to prevent applications running in a queue to be run - * anywhere in the federated cluster. + * This is useful to prevent applications running in a queue to be run anywhere + * in the federated cluster. */ public class RejectRouterPolicy extends AbstractRouterPolicy { @@ -44,23 +46,31 @@ public class RejectRouterPolicy extends AbstractRouterPolicy { /** * The policy always reject requests. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. * * @return (never). * - * @throws YarnException (always) to prevent applications in this queue to - * be run anywhere in the federated cluster. + * @throws YarnException (always) to prevent applications in this queue to be + * run anywhere in the federated cluster. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // run standard validation, as error might differ validate(appSubmissionContext); throw new FederationPolicyException("The policy configured for this queue" + " (" + appSubmissionContext.getQueue() + ") reject all routing " - + "requests by construction. Application " + appSubmissionContext - .getApplicationId() + " cannot be routed to any RM."); + + "requests by construction. Application " + + appSubmissionContext.getApplicationId() + + " cannot be routed to any RM."); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index d820449..bc729b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -59,18 +59,24 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy { } /** - * Simply picks a random active subcluster to start the AM (this does NOT + * Simply picks a random active subCluster to start the AM (this does NOT * depend on the weights in the policy). * - * @param appSubmissionContext the context for the app being submitted - * (ignored). + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. * * @return a randomly chosen subcluster. * * @throws YarnException if there are no active subclusters. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -79,6 +85,15 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy { getActiveSubclusters(); List list = new ArrayList<>(activeSubclusters.keySet()); + + if (blackListSubClusters != null) { + + // Remove from the active SubClusters from StateStore the blacklisted ones + for (SubClusterId scId : blackListSubClusters) { + list.remove(scId); + } + } + return list.get(rand.nextInt(list.size())); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index 5727134..7f230a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import java.util.Random; @@ -41,7 +42,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -58,6 +60,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { float totActiveWeight = 0; for (Map.Entry entry : weights.entrySet()) { + if (blacklist != null && blacklist.contains(entry.getKey().toId())) { + continue; + } if (entry.getKey() != null && activeSubclusters.containsKey(entry.getKey().toId())) { totActiveWeight += entry.getValue(); @@ -66,6 +71,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { float lookupValue = rand.nextFloat() * totActiveWeight; for (SubClusterId id : activeSubclusters.keySet()) { + if (blacklist != null && blacklist.contains(id)) { + continue; + } SubClusterIdInfo idInfo = new SubClusterIdInfo(id); if (weights.containsKey(idInfo)) { lookupValue -= weights.get(idInfo); @@ -77,4 +85,5 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { // should never happen return null; } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 6bd8bf0..23978ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -104,7 +104,7 @@ public abstract class BaseFederationPoliciesTest { ConfigurableFederationPolicy localPolicy = getPolicy(); if (localPolicy instanceof FederationRouterPolicy) { ((FederationRouterPolicy) localPolicy) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); } else { String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java index 5fa02d6..d0e2dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java @@ -95,7 +95,7 @@ public class TestRouterPolicyFacade { // first call runs using standard UniformRandomRouterPolicy SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertTrue(routerFacade.globalPolicyMap .get(queue1) instanceof UniformRandomRouterPolicy); @@ -107,7 +107,7 @@ public class TestRouterPolicyFacade { .newInstance(getPriorityPolicy(queue1))); // second call is routed by new policy PriorityRouterPolicy - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(chosen.equals(subClusterIds.get(0))); Assert.assertTrue(routerFacade.globalPolicyMap .get(queue1) instanceof PriorityRouterPolicy); @@ -126,7 +126,7 @@ public class TestRouterPolicyFacade { // when invoked it returns the expected SubClusterId. SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); // now the caching of policies must have added an entry for this queue @@ -160,19 +160,19 @@ public class TestRouterPolicyFacade { String uninitQueue = "non-initialized-queue"; when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue); SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); // empty string when(applicationSubmissionContext.getQueue()).thenReturn(""); - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); // null queue also falls back to default when(applicationSubmissionContext.getQueue()).thenReturn(null); - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java index 2e7a0af..c7a7767 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -18,11 +18,19 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -40,12 +48,43 @@ public abstract class BaseRouterPoliciesTest ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, Resources.none(), null, false, null, null); SubClusterId chosen = - localPolicy.getHomeSubcluster(applicationSubmissionContext); + localPolicy.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertNotNull(chosen); } @Test(expected = FederationPolicyException.class) public void testNullAppContext() throws YarnException { - ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null); + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null, null); + } + + @Test + public void testBlacklistSubcluster() throws YarnException { + FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); + ApplicationSubmissionContext applicationSubmissionContext = + ApplicationSubmissionContext.newInstance(null, null, null, null, null, + false, false, 0, Resources.none(), null, false, null, null); + Map activeSubClusters = + getActiveSubclusters(); + if (activeSubClusters != null && activeSubClusters.size() > 1 + && !(localPolicy instanceof RejectRouterPolicy)) { + // blacklist all the active subcluster but one. + Random random = new Random(); + List blacklistSubclusters = + new ArrayList(activeSubClusters.keySet()); + SubClusterId removed = blacklistSubclusters + .remove(random.nextInt(blacklistSubclusters.size())); + // bias LoadBasedRouterPolicy + getPolicyInfo().getRouterPolicyWeights() + .put(new SubClusterIdInfo(removed), 1.0f); + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), getActiveSubclusters()); + + SubClusterId chosen = localPolicy.getHomeSubcluster( + applicationSubmissionContext, blacklistSubclusters); + + // check that the selected sub-cluster is only one not blacklisted + Assert.assertNotNull(chosen); + Assert.assertEquals(removed, chosen); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java index af7fe43..ee3e09d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java @@ -70,7 +70,7 @@ public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest { for (int i = 0; i < jobPerSub * numSubclusters; i++) { when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i); chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(applicationSubmissionContext); + .getHomeSubcluster(applicationSubmissionContext, null); counter.get(chosen).addAndGet(1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index b70b4aa..dc8f99b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -97,7 +97,7 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest { public void testLoadIsRespected() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); // check the "planted" best cluster is chosen Assert.assertEquals("sc05", chosen.getId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index 42d919d..3c036c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -78,7 +78,7 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest { @Test public void testPickLowestWeight() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); Assert.assertEquals("sc5", chosen.getId()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java index 049ebbf..1747f73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java @@ -47,7 +47,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest { @Test(expected = FederationPolicyException.class) public void testNoClusterIsChosen() throws YarnException { ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); } @Override @@ -57,7 +57,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest { ApplicationSubmissionContext applicationSubmissionContext = ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, Resources.none(), null, false, null, null); - localPolicy.getHomeSubcluster(applicationSubmissionContext); + localPolicy.getHomeSubcluster(applicationSubmissionContext, null); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java index b45aa2a..05490ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -57,7 +57,7 @@ public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest { @Test public void testOneSubclusterIsChosen() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e002563/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index 09173e6..c969a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -98,7 +98,7 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest { for (float i = 0; i < numberOfDraws; i++) { SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); counter.get(chosenId).incrementAndGet(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org