hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [19/50] [abbrv] hadoop git commit: YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).
Date Thu, 22 Jun 2017 21:14:03 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index e57709f..5de749f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,8 +17,8 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -30,34 +30,27 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.util.Map;
-
 /**
  * This implements a simple load-balancing policy. The policy "weights" are
  * binary 0/1 values that enable/disable each sub-cluster, and the policy peaks
  * the sub-cluster with the least load to forward this application.
  */
-public class LoadBasedRouterPolicy
-    extends BaseWeightedRouterPolicy {
-
-  private static final Log LOG =
-      LogFactory.getLog(LoadBasedRouterPolicy.class);
+public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
 
   @Override
-  public void reinitialize(FederationPolicyInitializationContext
-      federationPolicyContext)
+  public void reinitialize(FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException {
 
     // remember old policyInfo
     WeightedPolicyInfo tempPolicy = getPolicyInfo();
 
-    //attempt new initialization
-    super.reinitialize(federationPolicyContext);
+    // attempt new initialization
+    super.reinitialize(policyContext);
 
-    //check extra constraints
+    // check extra constraints
     for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) {
       if (weight != 0 && weight != 1) {
-        //reset to old policyInfo if check fails
+        // reset to old policyInfo if check fails
         setPolicyInfo(tempPolicy);
         throw new FederationPolicyInitializationException(
             this.getClass().getCanonicalName()
@@ -69,18 +62,16 @@ public class LoadBasedRouterPolicy
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext)
-      throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
-    Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
-        .getRouterPolicyWeights();
+    Map<SubClusterIdInfo, Float> weights =
+        getPolicyInfo().getRouterPolicyWeights();
     SubClusterIdInfo chosen = null;
     long currBestMem = -1;
-    for (Map.Entry<SubClusterId, SubClusterInfo> entry :
-        activeSubclusters
+    for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
         .entrySet()) {
       SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
       if (weights.containsKey(id) && weights.get(id) > 0) {
@@ -95,8 +86,7 @@ public class LoadBasedRouterPolicy
     return chosen.toId();
   }
 
-  private long getAvailableMemory(SubClusterInfo value)
-      throws YarnException {
+  private long getAvailableMemory(SubClusterInfo value) throws YarnException {
     try {
       long mem = -1;
       JSONObject obj = new JSONObject(value.getCapability());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index a8ac5f7..bc3a1f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,39 +17,32 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 
-import java.util.Map;
-
 /**
  * This implements a policy that interprets "weights" as a ordered list of
  * preferences among sub-clusters. Highest weight among active subclusters is
  * chosen.
  */
-public class PriorityRouterPolicy
-    extends BaseWeightedRouterPolicy {
-
-  private static final Log LOG =
-      LogFactory.getLog(PriorityRouterPolicy.class);
+public class PriorityRouterPolicy extends AbstractRouterPolicy {
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext)
-      throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
     // This finds the sub-cluster with the highest weight among the
     // currently active ones.
-    Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
-        .getRouterPolicyWeights();
+    Map<SubClusterIdInfo, Float> weights =
+        getPolicyInfo().getRouterPolicyWeights();
     SubClusterId chosen = null;
     Float currentBest = Float.MIN_VALUE;
     for (SubClusterId id : activeSubclusters.keySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index 1774961..b8f9cc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -17,6 +17,11 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -25,11 +30,6 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
 /**
  * This simple policy picks at uniform random among any of the currently active
  * subclusters. This policy is easy to use and good for testing.
@@ -39,7 +39,7 @@ import java.util.Random;
  * of the "weights", in which case the {@link UniformRandomRouterPolicy} send
  * load to them, while {@code WeightedRandomRouterPolicy} does not.
  */
-public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
+public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
 
   private Random rand;
 
@@ -49,14 +49,14 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
 
   @Override
   public void reinitialize(
-      FederationPolicyInitializationContext federationPolicyContext)
+      FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException {
     FederationPolicyInitializationContextValidator
-        .validate(federationPolicyContext, this.getClass().getCanonicalName());
+        .validate(policyContext, this.getClass().getCanonicalName());
 
-    //note: this overrides BaseWeighterRouterPolicy and ignores the weights
+    // note: this overrides AbstractRouterPolicy and ignores the weights
 
-    setPolicyContext(federationPolicyContext);
+    setPolicyContext(policyContext);
   }
 
   /**
@@ -64,21 +64,19 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
    * depend on the weights in the policy).
    *
    * @param appSubmissionContext the context for the app being submitted
-   *                             (ignored).
+   *          (ignored).
    *
    * @return a randomly chosen subcluster.
    *
    * @throws YarnException if there are no active subclusters.
    */
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext)
-      throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
-    List<SubClusterId> list =
-        new ArrayList<>(activeSubclusters.keySet());
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
     return list.get(rand.nextInt(list.size()));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 0777677..ac75ae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,32 +18,30 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-
-import java.util.Map;
-import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This policy implements a weighted random sample among currently active
  * sub-clusters.
  */
-public class WeightedRandomRouterPolicy
-    extends BaseWeightedRouterPolicy {
+public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
 
-  private static final Log LOG =
-      LogFactory.getLog(WeightedRandomRouterPolicy.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WeightedRandomRouterPolicy.class);
   private Random rand = new Random(System.currentTimeMillis());
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext)
-      throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
@@ -52,13 +50,13 @@ public class WeightedRandomRouterPolicy
     // changes dynamically (and this would unfairly spread the load to
     // sub-clusters adjacent to an inactive one), hence we need to count/scan
     // the list and based on weight pick the next sub-cluster.
-    Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
-        .getRouterPolicyWeights();
+    Map<SubClusterIdInfo, Float> weights =
+        getPolicyInfo().getRouterPolicyWeights();
 
     float totActiveWeight = 0;
-    for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){
-      if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey()
-          .toId())){
+    for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
+      if (entry.getKey() != null
+          && activeSubclusters.containsKey(entry.getKey().toId())) {
         totActiveWeight += entry.getValue();
       }
     }
@@ -73,7 +71,7 @@ public class WeightedRandomRouterPolicy
         return id;
       }
     }
-    //should never happen
+    // should never happen
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
index 5d0fcb6..e445ac3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
@@ -17,4 +17,3 @@
  */
 /** Router policies. **/
 package org.apache.hadoop.yarn.server.federation.policies.router;
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
index 8238633..6b4f60c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
@@ -57,11 +57,11 @@ public abstract class AbstractSubClusterResolver implements SubClusterResolver {
     return rackToSubClusters.get(rackname);
   }
 
-  protected Map<String, SubClusterId> getNodeToSubCluster() {
+  public Map<String, SubClusterId> getNodeToSubCluster() {
     return nodeToSubCluster;
   }
 
-  protected Map<String, Set<SubClusterId>> getRackToSubClusters() {
+  public Map<String, Set<SubClusterId>> getRackToSubClusters() {
     return rackToSubClusters;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 8da92b9..ba897da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -22,14 +22,17 @@ import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -49,6 +52,7 @@ public abstract class BaseFederationPoliciesTest {
   private ApplicationSubmissionContext applicationSubmissionContext =
       mock(ApplicationSubmissionContext.class);
   private Random rand = new Random();
+  private SubClusterId homeSubCluster;
 
   @Test
   public void testReinitilialize() throws YarnException {
@@ -88,16 +92,22 @@ public abstract class BaseFederationPoliciesTest {
     getPolicy().reinitialize(fpc);
   }
 
-  @Test(expected = NoActiveSubclustersException.class)
+  @Test(expected = FederationPolicyException.class)
   public void testNoSubclusters() throws YarnException {
     // empty the activeSubclusters map
     FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
         getPolicyInfo(), new HashMap<>());
 
-    ConfigurableFederationPolicy currentPolicy = getPolicy();
-    if (currentPolicy instanceof FederationRouterPolicy) {
-      ((FederationRouterPolicy) currentPolicy)
+    ConfigurableFederationPolicy localPolicy = getPolicy();
+    if (localPolicy instanceof FederationRouterPolicy) {
+      ((FederationRouterPolicy) localPolicy)
           .getHomeSubcluster(getApplicationSubmissionContext());
+    } else {
+      String[] hosts = new String[] {"host1", "host2" };
+      List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+          .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+      ((FederationAMRMProxyPolicy) localPolicy)
+          .splitResourceRequests(resourceRequests);
     }
   }
 
@@ -152,4 +162,12 @@ public abstract class BaseFederationPoliciesTest {
     this.rand = rand;
   }
 
+  public SubClusterId getHomeSubCluster() {
+    return homeSubCluster;
+  }
+
+  public void setHomeSubCluster(SubClusterId homeSubCluster) {
+    this.homeSubCluster = homeSubCluster;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index e840b3f..c79fd2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -16,22 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.hadoop.yarn.server.federation.policies;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
 import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-
 /**
  * Test class for {@link FederationPolicyInitializationContextValidator}.
  */
@@ -45,11 +43,10 @@ public class TestFederationPolicyInitializationContextValidator {
   @Before
   public void setUp() throws Exception {
     goodFacade = FederationPoliciesTestUtil.initFacade();
-    goodConfig =
-        new MockPolicyManager().serializeConf();
-    goodSR =FederationPoliciesTestUtil.initResolver();
-    context = new
-        FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade);
+    goodConfig = new MockPolicyManager().serializeConf();
+    goodSR = FederationPoliciesTestUtil.initResolver();
+    context = new FederationPolicyInitializationContext(goodConfig, goodSR,
+        goodFacade);
   }
 
   @Test
@@ -100,8 +97,7 @@ public class TestFederationPolicyInitializationContextValidator {
 
     @Override
     public FederationAMRMProxyPolicy getAMRMPolicy(
-        FederationPolicyInitializationContext
-            federationPolicyInitializationContext,
+        FederationPolicyInitializationContext policyContext,
         FederationAMRMProxyPolicy oldInstance)
         throws FederationPolicyInitializationException {
       return null;
@@ -109,8 +105,7 @@ public class TestFederationPolicyInitializationContextValidator {
 
     @Override
     public FederationRouterPolicy getRouterPolicy(
-        FederationPolicyInitializationContext
-            federationPolicyInitializationContext,
+        FederationPolicyInitializationContext policyContext,
         FederationRouterPolicy oldInstance)
         throws FederationPolicyInitializationException {
       return null;
@@ -120,8 +115,8 @@ public class TestFederationPolicyInitializationContextValidator {
     public SubClusterPolicyConfiguration serializeConf()
         throws FederationPolicyInitializationException {
       ByteBuffer buf = ByteBuffer.allocate(0);
-      return SubClusterPolicyConfiguration
-          .newInstance("queue1", this.getClass().getCanonicalName(), buf);
+      return SubClusterPolicyConfiguration.newInstance("queue1",
+          this.getClass().getCanonicalName(), buf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
new file mode 100644
index 0000000..a21f53d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test class for the {@link BroadcastAMRMProxyPolicy}.
+ */
+public class TestBroadcastAMRMProxyFederationPolicy
+    extends BaseFederationPoliciesTest {
+
+  @Before
+  public void setUp() throws Exception {
+    setPolicy(new BroadcastAMRMProxyPolicy());
+    // needed for base test to work
+    setPolicyInfo(mock(WeightedPolicyInfo.class));
+
+    for (int i = 1; i <= 2; i++) {
+      SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+      SubClusterInfo sci = mock(SubClusterInfo.class);
+      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      getActiveSubclusters().put(sc.toId(), sci);
+    }
+
+    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+        mock(WeightedPolicyInfo.class), getActiveSubclusters());
+
+  }
+
+  @Test
+  public void testSplitAllocateRequest() throws Exception {
+    // verify the request is broadcasted to all subclusters
+    String[] hosts = new String[] {"host1", "host2" };
+    List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+        .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+    Assert.assertTrue(response.size() == 2);
+    for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
+        .entrySet()) {
+      Assert.assertTrue(getActiveSubclusters().get(entry.getKey()) != null);
+      for (ResourceRequest r : entry.getValue()) {
+        Assert.assertTrue(resourceRequests.contains(r));
+      }
+    }
+    for (SubClusterId subClusterId : getActiveSubclusters().keySet()) {
+      for (ResourceRequest r : response.get(subClusterId)) {
+        Assert.assertTrue(resourceRequests.contains(r));
+      }
+    }
+  }
+
+  @Test
+  public void testNotifyOfResponse() throws Exception {
+    String[] hosts = new String[] {"host1", "host2" };
+    List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+        .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    try {
+      ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
+          SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
+      Assert.fail();
+    } catch (FederationPolicyException f) {
+      System.out.println("Expected: " + f.getMessage());
+    }
+
+    ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
+        SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
new file mode 100644
index 0000000..2654a06
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}.
+ */
+public class TestLocalityMulticastAMRMProxyPolicy
+    extends BaseFederationPoliciesTest {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class);
+
+  @Before
+  public void setUp() throws Exception {
+    setPolicy(new LocalityMulticastAMRMProxyPolicy());
+    setPolicyInfo(new WeightedPolicyInfo());
+    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+    Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+
+    // simulate 20 subclusters with a 5% chance of being inactive
+    for (int i = 0; i < 6; i++) {
+      SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
+      // sub-cluster 3 is not active
+      if (i != 3) {
+        SubClusterInfo sci = mock(SubClusterInfo.class);
+        when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+        when(sci.getSubClusterId()).thenReturn(sc.toId());
+        getActiveSubclusters().put(sc.toId(), sci);
+      }
+
+      float weight = 1 / 10f;
+      routerWeights.put(sc, weight);
+      amrmWeights.put(sc, weight);
+      // sub-cluster 4 is "disabled" in the weights
+      if (i == 4) {
+        routerWeights.put(sc, 0f);
+        amrmWeights.put(sc, 0f);
+      }
+    }
+
+    getPolicyInfo().setRouterPolicyWeights(routerWeights);
+    getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+    getPolicyInfo().setHeadroomAlpha(0.5f);
+    setHomeSubCluster(SubClusterId.newInstance("homesubcluster"));
+
+  }
+
+  @Test
+  public void testReinitilialize() throws YarnException {
+    initializePolicy();
+  }
+
+  private void initializePolicy() throws YarnException {
+    setFederationPolicyContext(new FederationPolicyInitializationContext());
+    SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
+    getFederationPolicyContext().setFederationSubclusterResolver(resolver);
+    ByteBuffer buf = getPolicyInfo().toByteBuffer();
+    getFederationPolicyContext().setSubClusterPolicyConfiguration(
+        SubClusterPolicyConfiguration.newInstance("queue1",
+            getPolicy().getClass().getCanonicalName(), buf));
+    getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
+    FederationPoliciesTestUtil.initializePolicyContext(
+        getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
+        getActiveSubclusters());
+  }
+
+  @Test
+  public void testSplitBasedOnHeadroom() throws Exception {
+
+    // Tests how the headroom info are used to split based on the capacity
+    // each RM claims to give us.
+    // Configure policy to be 100% headroom based
+    getPolicyInfo().setHeadroomAlpha(1.0f);
+
+    initializePolicy();
+    List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+    prepPolicyWithHeadroom();
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    // pretty print requests
+    LOG.info("Initial headroom");
+    prettyPrintRequests(response);
+
+    validateSplit(response, resourceRequests);
+
+    // based on headroom, we expect 75 containers to got to subcluster0,
+    // as it advertise lots of headroom (100), no containers for sublcuster1
+    // as it advertise zero headroom, 1 to subcluster 2 (as it advertise little
+    // headroom (1), and 25 to subcluster5 which has unknown headroom, and so
+    // it gets 1/4th of the load
+    checkExpectedAllocation(response, "subcluster0", 1, 75);
+    checkExpectedAllocation(response, "subcluster1", 1, -1);
+    checkExpectedAllocation(response, "subcluster2", 1, 1);
+    checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+    // notify a change in headroom and try again
+    AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
+    ((FederationAMRMProxyPolicy) getPolicy())
+        .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
+    ((FederationAMRMProxyPolicy) getPolicy())
+        .splitResourceRequests(resourceRequests);
+
+    LOG.info("After headroom update");
+    prettyPrintRequests(response);
+    validateSplit(response, resourceRequests);
+
+    // we simulated a change in headroom for subcluster2, which will now
+    // have the same headroom of subcluster0 and so it splits the requests
+    // note that the total is still less or equal to (userAsk + numSubClusters)
+    checkExpectedAllocation(response, "subcluster0", 1, 38);
+    checkExpectedAllocation(response, "subcluster1", 1, -1);
+    checkExpectedAllocation(response, "subcluster2", 1, 38);
+    checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+  }
+
+  @Test(timeout = 5000)
+  public void testStressPolicy() throws Exception {
+
+    // Tests how the headroom info are used to split based on the capacity
+    // each RM claims to give us.
+    // Configure policy to be 100% headroom based
+    getPolicyInfo().setHeadroomAlpha(1.0f);
+
+    initializePolicy();
+
+    int numRR = 1000;
+    List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
+
+    prepPolicyWithHeadroom();
+
+    int numIterations = 1000;
+    long tstart = System.currentTimeMillis();
+    for (int i = 0; i < numIterations; i++) {
+      Map<SubClusterId, List<ResourceRequest>> response =
+          ((FederationAMRMProxyPolicy) getPolicy())
+              .splitResourceRequests(resourceRequests);
+      validateSplit(response, resourceRequests);
+    }
+    long tend = System.currentTimeMillis();
+
+    LOG.info("Performed " + numIterations + " policy invocations (and "
+        + "validations) in " + (tend - tstart) + "ms");
+  }
+
+  @Test
+  public void testFWDAllZeroANY() throws Exception {
+
+    // Tests how the headroom info are used to split based on the capacity
+    // each RM claims to give us.
+    // Configure policy to be 100% headroom based
+    getPolicyInfo().setHeadroomAlpha(0.5f);
+
+    initializePolicy();
+    List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
+
+    // this receives responses from sc0,sc1,sc2
+    prepPolicyWithHeadroom();
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    // we expect all three to appear for a zero-sized ANY
+
+    // pretty print requests
+    prettyPrintRequests(response);
+
+    validateSplit(response, resourceRequests);
+
+    // we expect the zero size request to be sent to the first 3 rm (due to
+    // the fact that we received responses only from these 3 sublcusters)
+    checkExpectedAllocation(response, "subcluster0", 1, 0);
+    checkExpectedAllocation(response, "subcluster1", 1, 0);
+    checkExpectedAllocation(response, "subcluster2", 1, 0);
+    checkExpectedAllocation(response, "subcluster3", -1, -1);
+    checkExpectedAllocation(response, "subcluster4", -1, -1);
+    checkExpectedAllocation(response, "subcluster5", -1, -1);
+  }
+
+  @Test
+  public void testSplitBasedOnHeadroomAndWeights() throws Exception {
+
+    // Tests how the headroom info are used to split based on the capacity
+    // each RM claims to give us.
+
+    // Configure policy to be 50% headroom based and 50% weight based
+    getPolicyInfo().setHeadroomAlpha(0.5f);
+
+    initializePolicy();
+    List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+    prepPolicyWithHeadroom();
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    // pretty print requests
+    prettyPrintRequests(response);
+
+    validateSplit(response, resourceRequests);
+
+    // in this case the headroom allocates 50 containers, while weights allocate
+    // the rest. due to weights we have 12.5 (round to 13) containers for each
+    // sublcuster, the rest is due to headroom.
+    checkExpectedAllocation(response, "subcluster0", 1, 50);
+    checkExpectedAllocation(response, "subcluster1", 1, 13);
+    checkExpectedAllocation(response, "subcluster2", 1, 13);
+    checkExpectedAllocation(response, "subcluster3", -1, -1);
+    checkExpectedAllocation(response, "subcluster4", -1, -1);
+    checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+  }
+
+  private void prepPolicyWithHeadroom() throws YarnException {
+    AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
+    ((FederationAMRMProxyPolicy) getPolicy())
+        .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+
+    ar = getAllocateResponseWithTargetHeadroom(0);
+    ((FederationAMRMProxyPolicy) getPolicy())
+        .notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar);
+
+    ar = getAllocateResponseWithTargetHeadroom(1);
+    ((FederationAMRMProxyPolicy) getPolicy())
+        .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
+  }
+
+  private AllocateResponse getAllocateResponseWithTargetHeadroom(
+      int numContainers) {
+    return AllocateResponse.newInstance(0, null, null,
+        Collections.<NodeReport> emptyList(),
+        Resource.newInstance(numContainers * 1024, numContainers), null, 10,
+        null, Collections.<NMToken> emptyList());
+  }
+
+  @Test
+  public void testSplitAllocateRequest() throws Exception {
+
+    // Test a complex List<ResourceRequest> is split correctly
+    initializePolicy();
+
+    // modify default initialization to include a "homesubcluster"
+    // which we will use as the default for when nodes or racks are unknown
+    SubClusterInfo sci = mock(SubClusterInfo.class);
+    when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+    when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
+    getActiveSubclusters().put(getHomeSubCluster(), sci);
+    SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId());
+
+    getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
+    getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
+
+    FederationPoliciesTestUtil.initializePolicyContext(
+        getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
+        getActiveSubclusters());
+
+    List<ResourceRequest> resourceRequests = createComplexRequest();
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    validateSplit(response, resourceRequests);
+    prettyPrintRequests(response);
+
+    // we expect 4 entry for home subcluster (3 for request-id 4, and a part
+    // of the broadcast of request-id 2
+    checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23);
+
+    // for subcluster0 we expect 3 entry from request-id 0, and 3 from
+    // request-id 3, as well as part of the request-id 2 broadast
+    checkExpectedAllocation(response, "subcluster0", 7, 26);
+
+    // we expect 5 entry for subcluster1 (4 from request-id 1, and part
+    // of the broadcast of request-id 2
+    checkExpectedAllocation(response, "subcluster1", 5, 25);
+
+    // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
+    // broadcast of request-id 2, and no request-id 0
+    checkExpectedAllocation(response, "subcluster2", 4, 23);
+
+    // subcluster id 3, 4 should not appear (due to weights or active/inactive)
+    checkExpectedAllocation(response, "subcluster3", -1, -1);
+    checkExpectedAllocation(response, "subcluster4", -1, -1);
+
+    // subcluster5 should get only part of the request-id 2 broadcast
+    checkExpectedAllocation(response, "subcluster5", 1, 20);
+
+    // check that the allocations that show up are what expected
+    for (ResourceRequest rr : response.get(getHomeSubCluster())) {
+      Assert.assertTrue(rr.getAllocationRequestId() == 4L
+          || rr.getAllocationRequestId() == 2L);
+    }
+
+    for (ResourceRequest rr : response.get(getHomeSubCluster())) {
+      Assert.assertTrue(rr.getAllocationRequestId() != 1L);
+    }
+
+    List<ResourceRequest> rrs =
+        response.get(SubClusterId.newInstance("subcluster0"));
+    for (ResourceRequest rr : rrs) {
+      Assert.assertTrue(rr.getAllocationRequestId() != 1L);
+    }
+
+    for (ResourceRequest rr : response
+        .get(SubClusterId.newInstance("subcluster2"))) {
+      Assert.assertTrue(rr.getAllocationRequestId() != 0L);
+    }
+
+    for (ResourceRequest rr : response
+        .get(SubClusterId.newInstance("subcluster5"))) {
+      Assert.assertTrue(rr.getAllocationRequestId() >= 2);
+      Assert.assertTrue(rr.getRelaxLocality());
+    }
+  }
+
+  // check that the number of containers in the first ResourceRequest in
+  // response for this sub-cluster matches expectations. -1 indicate the
+  // response should be null
+  private void checkExpectedAllocation(
+      Map<SubClusterId, List<ResourceRequest>> response, String subCluster,
+      long totResourceRequests, long totContainers) {
+    if (totContainers == -1) {
+      Assert.assertNull(response.get(SubClusterId.newInstance(subCluster)));
+    } else {
+      SubClusterId sc = SubClusterId.newInstance(subCluster);
+      Assert.assertEquals(totResourceRequests, response.get(sc).size());
+
+      long actualContCount = 0;
+      for (ResourceRequest rr : response.get(sc)) {
+        actualContCount += rr.getNumContainers();
+      }
+      Assert.assertEquals(totContainers, actualContCount);
+    }
+  }
+
+  private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split,
+      List<ResourceRequest> original) throws YarnException {
+
+    SubClusterResolver resolver =
+        getFederationPolicyContext().getFederationSubclusterResolver();
+
+    // Apply general validation rules
+    int numUsedSubclusters = split.size();
+
+    Set<Long> originalIds = new HashSet<>();
+    Set<Long> splitIds = new HashSet<>();
+
+    int originalContainers = 0;
+    for (ResourceRequest rr : original) {
+      originalContainers += rr.getNumContainers();
+      originalIds.add(rr.getAllocationRequestId());
+    }
+
+    int splitContainers = 0;
+    for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split
+        .entrySet()) {
+      for (ResourceRequest rr : rrs.getValue()) {
+        splitContainers += rr.getNumContainers();
+        splitIds.add(rr.getAllocationRequestId());
+        // check node-local asks are sent to right RM (only)
+        SubClusterId fid = null;
+        try {
+          fid = resolver.getSubClusterForNode(rr.getResourceName());
+        } catch (YarnException e) {
+          // ignore code will handle
+        }
+        if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null
+            && !fid.equals(rrs.getKey())) {
+          Assert.fail("A node-local (or resolvable rack-local) RR should not "
+              + "be send to an RM other than what it resolves to.");
+        }
+      }
+    }
+
+    // check we are not inventing Allocation Ids
+    Assert.assertEquals(originalIds, splitIds);
+
+    // check we are not exceedingly replicating the container asks among
+    // RMs (a little is allowed due to rounding of fractional splits)
+    Assert.assertTrue(
+        " Containers requested (" + splitContainers + ") should "
+            + "not exceed the original count of containers ("
+            + originalContainers + ") by more than the number of subclusters ("
+            + numUsedSubclusters + ")",
+        originalContainers + numUsedSubclusters >= splitContainers);
+
+    // Test target Ids
+    for (SubClusterId targetId : split.keySet()) {
+      Assert.assertTrue("Target subclusters should be in the active set",
+          getActiveSubclusters().containsKey(targetId));
+      Assert.assertTrue(
+          "Target subclusters (" + targetId + ") should have weight >0 in "
+              + "the policy ",
+          getPolicyInfo().getRouterPolicyWeights()
+              .get(new SubClusterIdInfo(targetId)) > 0);
+    }
+  }
+
+  private void prettyPrintRequests(
+      Map<SubClusterId, List<ResourceRequest>> response) {
+    for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
+        .entrySet()) {
+      String str = "";
+      for (ResourceRequest rr : entry.getValue()) {
+        str += " [id:" + rr.getAllocationRequestId() + " loc:"
+            + rr.getResourceName() + " numCont:" + rr.getNumContainers()
+            + "], ";
+      }
+      LOG.info(entry.getKey() + " --> " + str);
+    }
+  }
+
+  private List<ResourceRequest> createLargeRandomList(int numRR)
+      throws Exception {
+
+    List<ResourceRequest> out = new ArrayList<>();
+    Random rand = new Random(1);
+    DefaultSubClusterResolverImpl resolver =
+        (DefaultSubClusterResolverImpl) getFederationPolicyContext()
+            .getFederationSubclusterResolver();
+
+    List<String> nodes =
+        new ArrayList<>(resolver.getNodeToSubCluster().keySet());
+
+    for (int i = 0; i < numRR; i++) {
+      String nodeName = nodes.get(rand.nextInt(nodes.size()));
+      long allocationId = (long) rand.nextInt(20);
+
+      // create a single container request in sc0
+      out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId,
+          nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean()));
+    }
+    return out;
+  }
+
+  private List<ResourceRequest> createSimpleRequest() throws Exception {
+
+    List<ResourceRequest> out = new ArrayList<>();
+
+    // create a single container request in sc0
+    out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+        ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
+    return out;
+  }
+
+  private List<ResourceRequest> createZeroSizedANYRequest() throws Exception {
+
+    List<ResourceRequest> out = new ArrayList<>();
+
+    // create a single container request in sc0
+    out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+        ResourceRequest.ANY, 1024, 1, 1, 0, null, true));
+    return out;
+  }
+
+  private List<ResourceRequest> createComplexRequest() throws Exception {
+
+    List<ResourceRequest> out = new ArrayList<>();
+
+    // create a single container request in sc0
+    out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+        "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+        "subcluster0-rack0", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+        ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
+
+    // create a single container request with 3 alternative hosts across sc1,sc2
+    // where we want 2 containers in sc1 and 1 in sc2
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        "subcluster1-rack1-host1", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        "subcluster1-rack1-host2", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        "subcluster2-rack3-host3", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        "subcluster1-rack1", 1024, 1, 1, 2, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        "subcluster2-rack3", 1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+        ResourceRequest.ANY, 1024, 1, 1, 2, null, false));
+
+    // create a non-local ANY request that can span anything
+    out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
+        ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
+
+    // create a single container request in sc0 with relaxed locality
+    out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+        "subcluster0-rack0-host0", 1024, 1, 1, 1, null, true));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+        "subcluster0-rack0", 1024, 1, 1, 1, null, true));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+        ResourceRequest.ANY, 1024, 1, 1, 1, null, true));
+
+    // create a request of an unknown node/rack and expect this to show up
+    // in homesubcluster
+    out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode",
+        1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack",
+        1024, 1, 1, 1, null, false));
+    out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
+        ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
+
+    return out;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
index 9e94f72..906e35f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -17,6 +17,9 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -29,12 +32,9 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
- * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the
- * load is properly considered for allocation.
+ * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
+ * is properly considered for allocation.
  */
 public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
 
@@ -47,12 +47,10 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
 
     // simulate 20 active subclusters
     for (int i = 0; i < 20; i++) {
-      SubClusterIdInfo sc =
-          new SubClusterIdInfo(String.format("sc%02d", i));
+      SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
       SubClusterInfo federationSubClusterInfo =
           SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
-              SubClusterState.SC_RUNNING, -1,
-              generateClusterMetricsInfo(i));
+              SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i));
       getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
       float weight = getRand().nextInt(2);
       if (i == 5) {
@@ -76,7 +74,7 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
   private String generateClusterMetricsInfo(int id) {
 
     long mem = 1024 * getRand().nextInt(277 * 100 - 1);
-    //plant a best cluster
+    // plant a best cluster
     if (id == 5) {
       mem = 1024 * 277 * 100;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
index ff5175d..eefcfd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -16,6 +16,12 @@
  */
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -28,12 +34,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Simple test class for the {@link PriorityRouterPolicy}. Tests that the
  * weights are correctly used for ordering the choice of sub-clusters.
@@ -72,8 +72,7 @@ public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
     getPolicyInfo().setRouterPolicyWeights(routerWeights);
     getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
     FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(),
-        getActiveSubclusters());
+        getPolicyInfo(), getActiveSubclusters());
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index a612685..78967d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -17,6 +17,13 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -29,13 +36,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large
  * number of randomized tests to check we are weighiting correctly even if
@@ -71,8 +71,7 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
     getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
 
     FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(),
-        getActiveSubclusters());
+        getPolicyInfo(), getActiveSubclusters());
 
   }
 
@@ -88,8 +87,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
     float numberOfDraws = 1000000;
 
     for (float i = 0; i < numberOfDraws; i++) {
-      SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()).
-          getHomeSubcluster(getApplicationSubmissionContext());
+      SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
+          .getHomeSubcluster(getApplicationSubmissionContext());
       counter.get(chosenId).incrementAndGet();
     }
 
@@ -113,13 +112,15 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
       if (getActiveSubclusters().containsKey(counterEntry.getKey())) {
         Assert.assertTrue(
             "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
-                + " expected weight: " + expectedWeight, expectedWeight == 0 ||
-                (actualWeight / expectedWeight) < 1.1
-                    && (actualWeight / expectedWeight) > 0.9);
+                + " expected weight: " + expectedWeight,
+            expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1
+                && (actualWeight / expectedWeight) > 0.9);
       } else {
-        Assert.assertTrue(
-            "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
-                + " expected weight: " + expectedWeight, actualWeight == 0);
+        Assert
+            .assertTrue(
+                "Id " + counterEntry.getKey() + " Actual weight: "
+                    + actualWeight + " expected weight: " + expectedWeight,
+                actualWeight == 0);
 
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index f901329..87ed8d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.yarn.server.federation.utils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolv
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.*;
+import org.apache.hadoop.yarn.util.Records;
 
 import java.net.URL;
 import java.nio.ByteBuffer;
@@ -48,6 +50,68 @@ public final class FederationPoliciesTestUtil {
     // disabled.
   }
 
+  private static final String FEDR_NODE_PREFIX = "fedr-test-node-";
+
+
+  public static List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression, boolean relaxLocality) throws YarnException {
+    List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+    for (String host : hosts) {
+      ResourceRequest hostReq =
+          createResourceRequest(host, memory, vCores, priority, containers,
+              labelExpression, relaxLocality);
+      reqs.add(hostReq);
+      ResourceRequest rackReq =
+          createResourceRequest("/default-rack", memory, vCores, priority,
+              containers, labelExpression, relaxLocality);
+      reqs.add(rackReq);
+    }
+
+    ResourceRequest offRackReq =
+        createResourceRequest(ResourceRequest.ANY, memory, vCores, priority,
+            containers, labelExpression, relaxLocality);
+    reqs.add(offRackReq);
+    return reqs;
+  }
+
+  protected static ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers,
+      boolean relaxLocality) throws YarnException {
+    return createResourceRequest(resource, memory, vCores, priority, containers,
+        null, relaxLocality);
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public static ResourceRequest createResourceRequest(long id, String resource,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression, boolean relaxLocality) throws YarnException {
+    ResourceRequest out =
+        createResourceRequest(resource, memory, vCores, priority, containers,
+            labelExpression, relaxLocality);
+    out.setAllocationRequestId(id);
+    return out;
+  }
+
+  public static ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression, boolean relaxLocality) throws YarnException {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setResourceName(resource);
+    req.setNumContainers(containers);
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(priority);
+    req.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemorySize(memory);
+    capability.setVirtualCores(vCores);
+    req.setCapability(capability);
+    if (labelExpression != null) {
+      req.setNodeLabelExpression(labelExpression);
+    }
+    req.setRelaxLocality(relaxLocality);
+    return req;
+  }
 
   public static void initializePolicyContext(
       FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db923830/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
index e4d6112..2b7e237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
@@ -1,4 +1,8 @@
 node1,subcluster1,rack1
  node2 , subcluster2, RACK1
 noDE3,subcluster3, rack2
-node4, subcluster3, rack2
\ No newline at end of file
+node4, subcluster3, rack2
+subcluster0-rack0-host0,subcluster0, subcluster0-rack0
+Subcluster1-RACK1-HOST1,subcluster1, subCluster1-RACK1
+SUBCLUSTER1-RACK1-HOST2,subcluster1, subCluster1-RACK1
+SubCluster2-RACK3-HOST3,subcluster2, subcluster2-rack3


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message