hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [22/32] hadoop git commit: YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
Date Wed, 31 Jan 2018 15:57:46 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index a3b88c0..01d5e6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -170,7 +170,7 @@ public class TestCapacitySchedulerAutoQueueCreation
           1 * GB, 1, true, priority, recordFactory);
 
       cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
-          Collections.<ContainerId>emptyList(), Collections.singletonList(host),
+          null, Collections.<ContainerId>emptyList(), Collections.singletonList(host),
           null, NULL_UPDATE_REQUESTS);
 
       //And this will result in container assignment for app1

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
new file mode 100644
index 0000000..b6ac4b6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class TestCapacitySchedulerSchedulingRequestUpdate
+    extends CapacitySchedulerTestBase {
+  @Test
+  public void testBasicPendingResourceUpdate() throws Exception {
+    Configuration conf = TestUtils.getConfigurationWithQueueLabels(
+        new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    MockRM rm = new MockRM(conf) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.start();
+    MockNM nm1 = // label = x
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 = // label = ""
+        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    // Launch app2 in queue=b1
+    RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    // am1 asks for 8 * 1GB container for no label
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"),
+        "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, null);
+
+    // am2 asks for 8 * 1GB container for no label
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(1 * GB), 8)), null);
+
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 8 * GB, null);
+    checkPendingResource(rm, "b", 8 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 16 * GB, null);
+
+    // am2 asks for 8 * 1GB container in another priority for no label
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(2), "*",
+            Resources.createResource(1 * GB), 8)), null);
+
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 24 * GB, null);
+
+    // am1 asks 4 GB resource instead of 8 * GB for priority=1
+    // am1 asks for 8 * 1GB container for no label
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(1), 0, ImmutableSet.of("mapper", "reducer"),
+        "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+
+    // am1 asks 8 * GB resource which label=x
+    am1.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(2), "*",
+            Resources.createResource(8 * GB), 1, true, "x")), null);
+
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "a1", 8 * GB, "x");
+    checkPendingResource(rm, "a", 8 * GB, "x");
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, "x");
+
+    // complete am1/am2, pending resource should be 0 now
+    AppAttemptRemovedSchedulerEvent appRemovedEvent =
+        new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(),
+            RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+    appRemovedEvent = new AppAttemptRemovedSchedulerEvent(
+        am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x");
+    checkPendingResource(rm, "b1", 0 * GB, null);
+    checkPendingResource(rm, "b", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+  }
+
+  @Test
+  public void testNodePartitionPendingResourceUpdate() throws Exception {
+    Configuration conf = TestUtils.getConfigurationWithQueueLabels(
+        new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    MockRM rm = new MockRM(conf) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.start();
+    MockNM nm1 = // label = x
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 = // label = ""
+        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    // Launch app2 in queue=b1
+    RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    // am1 asks for 8 * 1GB container for "x"
+    am1.allocateIntraAppAntiAffinity("x",
+        ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(1), 0, "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 8 * GB, "x");
+    checkPendingResource(rm, "a", 8 * GB, "x");
+    checkPendingResource(rm, "root", 8 * GB, "x");
+
+    // am2 asks for 8 * 1GB container for "x"
+    am2.allocateIntraAppAntiAffinity("x",
+        ResourceSizing.newInstance(8, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(1), 0, "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 8 * GB, "x");
+    checkPendingResource(rm, "a", 8 * GB, "x");
+    checkPendingResource(rm, "b1", 8 * GB, "x");
+    checkPendingResource(rm, "b", 8 * GB, "x");
+    // root = a + b
+    checkPendingResource(rm, "root", 16 * GB, "x");
+
+    // am1 asks for 6 * 1GB container for "x" in another priority
+    am1.allocateIntraAppAntiAffinity("x",
+        ResourceSizing.newInstance(6, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(2), 0, "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 14 * GB, "x");
+    checkPendingResource(rm, "a", 14 * GB, "x");
+    checkPendingResource(rm, "b1", 8 * GB, "x");
+    checkPendingResource(rm, "b", 8 * GB, "x");
+    // root = a + b
+    checkPendingResource(rm, "root", 22 * GB, "x");
+
+    // am1 asks for 4 * 1GB container for "x" in priority=1, which should
+    // override 8 * 1GB
+    am1.allocateIntraAppAntiAffinity("x",
+        ResourceSizing.newInstance(4, Resource.newInstance(1 * GB, 1)),
+        Priority.newInstance(1), 0, "mapper", "reducer");
+
+    checkPendingResource(rm, "a1", 10 * GB, "x");
+    checkPendingResource(rm, "a", 10 * GB, "x");
+    checkPendingResource(rm, "b1", 8 * GB, "x");
+    checkPendingResource(rm, "b", 8 * GB, "x");
+    // root = a + b
+    checkPendingResource(rm, "root", 18 * GB, "x");
+
+    // complete am1/am2, pending resource should be 0 now
+    AppAttemptRemovedSchedulerEvent appRemovedEvent =
+        new AppAttemptRemovedSchedulerEvent(am2.getApplicationAttemptId(),
+            RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+    appRemovedEvent = new AppAttemptRemovedSchedulerEvent(
+        am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x");
+    checkPendingResource(rm, "b1", 0 * GB, null);
+    checkPendingResource(rm, "b", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index d2e28be..a800bef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -132,7 +132,7 @@ public class TestIncreaseAllocationExpirer {
     Assert.assertEquals(RMContainerState.RUNNING,
         rm1.getResourceScheduler().getRMContainer(containerId2).getState());
     // Verify container size is 3G
-    Assert.assertEquals(
+      Assert.assertEquals(
         3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
             .getAllocatedResource().getMemorySize());
     // Verify total resource usage

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
new file mode 100644
index 0000000..0a44a1e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocation {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  @Test
+  public void testIntraAppAntiAffinity() throws Exception {
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+        new Configuration());
+    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+        true);
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    // 4 NMs.
+    MockNM[] nms = new MockNM[4];
+    RMNode[] rmNodes = new RMNode[4];
+    for (int i = 0; i < 4; i++) {
+      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+    }
+
+    // app1 -> c
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+    // app1 asks for 10 anti-affinity containers for the same app. It should
+    // only get 4 containers allocated because we only have 4 nodes.
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 5 containers allocated (1 AM + 1 node each).
+    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+    // Similarly, app1 asks 10 anti-affinity containers at different priority,
+    // it should be satisfied as well.
+    // app1 asks for 10 anti-affinity containers for the same app. It should
+    // only get 4 containers allocated because we only have 4 nodes.
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+        Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 9 containers allocated (1 AM + 8 containers).
+    Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+    // Test anti-affinity to both of "mapper/reducer", we should only get no
+    // container allocated
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
+        Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 10 containers allocated (1 AM + 9 containers).
+    Assert.assertEquals(9, schedulerApp.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+        new Configuration());
+    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+        true);
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    // 4 NMs.
+    MockNM[] nms = new MockNM[4];
+    RMNode[] rmNodes = new RMNode[4];
+    for (int i = 0; i < 4; i++) {
+      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+    }
+
+    // app1 -> c
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+    // app1 asks for 2 anti-affinity containers for the same app.
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+        "tag_1_1", "tag_1_2");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 3 containers allocated (1 AM + 2 task).
+    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(3, schedulerApp.getLiveContainers().size());
+
+    // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
+    // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+        Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
+        "tag_1_1", "tag_1_2");
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 4 containers allocated (1 AM + 2 task (first request) +
+    // 1 task (2nd request).
+    Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+
+    // app1 asks for 10 anti-affinity containers for the same app. anti-affinity
+    // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
+        Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
+        "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get 1 more containers allocated
+    // 1 AM + 2 task (first request) + 1 task (2nd request) +
+    // 1 task (3rd request)
+    Assert.assertEquals(5, schedulerApp.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testSchedulingRequestDisabledByDefault() throws Exception {
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+        new Configuration());
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    // 4 NMs.
+    MockNM[] nms = new MockNM[4];
+    RMNode[] rmNodes = new RMNode[4];
+    for (int i = 0; i < 4; i++) {
+      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+    }
+
+    // app1 -> c
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+    // app1 asks for 2 anti-affinity containers for the same app.
+    boolean caughtException = false;
+    try {
+      // Since feature is disabled by default, we should expect exception.
+      am1.allocateIntraAppAntiAffinity(
+          ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
+          Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
+          "tag_1_1", "tag_1_2");
+    } catch (Exception e) {
+      caughtException = true;
+    }
+    Assert.assertTrue(caughtException);
+    rm1.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
new file mode 100644
index 0000000..c7f13cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSchedulingRequestContainerAllocationAsync {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+        new Configuration());
+    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+        true);
+    csConf.setInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+        numThreads);
+    csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+        + ".scheduling-interval-ms", 0);
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    // 200 NMs.
+    int nNMs = 200;
+    MockNM[] nms = new MockNM[nNMs];
+    RMNode[] rmNodes = new RMNode[nNMs];
+    for (int i = 0; i < nNMs; i++) {
+      nms[i] = rm1.registerNode("127.0.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+    }
+
+    // app1 -> c
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+    // app1 asks for 10 anti-affinity containers for the same app. It should
+    // only get 4 containers allocated because we only have 4 nodes.
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(1000, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < nNMs; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    // App1 should get #NM + 1 containers allocated (1 node each + 1 AM).
+    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(nNMs + 1, schedulerApp.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test(timeout = 300000)
+  public void testSingleThreadAsyncContainerAllocation() throws Exception {
+    testIntraAppAntiAffinityAsync(1);
+  }
+
+  @Test(timeout = 300000)
+  public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+    testIntraAppAntiAffinityAsync(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+    testIntraAppAntiAffinityAsync(3);
+  }
+
+  @Test(timeout = 300000)
+  public void testFourThreadsAsyncContainerAllocation() throws Exception {
+    testIntraAppAntiAffinityAsync(4);
+  }
+
+  @Test(timeout = 300000)
+  public void testFiveThreadsAsyncContainerAllocation() throws Exception {
+    testIntraAppAntiAffinityAsync(5);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e8734cc..542ba3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -275,6 +275,8 @@ public class TestUtils {
   public static Configuration getConfigurationWithQueueLabels(Configuration config) {
     CapacitySchedulerConfiguration conf =
         new CapacitySchedulerConfiguration(config);
+    conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+        true);
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index f1d5663..7afe4ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -20,10 +20,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
-import java.util.List;
-
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -33,7 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.List;
 
 /**
  * Test functionality of AllocationTagsManager.
@@ -54,7 +54,6 @@ public class TestAllocationTagsManager {
     rmContext = rm.getRMContext();
   }
 
-
   @Test
   public void testAllocationTagsManagerSimpleCases()
       throws InvalidAllocationTagsQueryException {
@@ -141,30 +140,31 @@ public class TestAllocationTagsManager {
 
     // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
     // (Expect this returns #containers from app1 on node2)
-    Assert
-        .assertEquals(2,
-            atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-                TestUtils.getMockApplicationId(1),
-                ImmutableSet.of(AllocationTagsNamespaces.APP_ID
-                    + TestUtils.getMockApplicationId(1).toString()),
-                Long::max));
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(1), null, Long::max));
 
     // Get Node Cardinality of app1 on node2, with empty tag set, op=max
     Assert.assertEquals(2,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(1), null, Long::max));
+
+    // Get Cardinality of app1 on node2, with empty tag set, op=max
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
             TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
 
     // Get Node Cardinality of all apps on node2, with empty tag set, op=sum
-    Assert.assertEquals(7, atm.getNodeCardinalityByOp(
+    Assert.assertEquals(4, atm.getNodeCardinalityByOp(
         NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
 
     // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
-    Assert.assertEquals(5,
+    Assert.assertEquals(3,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
             TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
 
     // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
-    Assert.assertEquals(2,
+    Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
             TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
 
@@ -296,7 +296,7 @@ public class TestAllocationTagsManager {
     Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer"));
 
     // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
-    Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0",
+    Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
         TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
 
     // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
index 7492233..8ad726e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -117,9 +117,9 @@ public class TestPlacementConstraintsUtil {
       RMNode currentNode = nodeIterator.next();
       FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
           currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
-      Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+      Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
           sourceTag1, schedulerNode, pcm, tm));
-      Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+      Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
           sourceTag2, schedulerNode, pcm, tm));
     }
     /**
@@ -145,14 +145,14 @@ public class TestPlacementConstraintsUtil {
     tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
 
     // 'spark' placement on Node0 should now SUCCEED
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode0, pcm, tm));
     // FAIL on the rest of the nodes
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode1, pcm, tm));
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode2, pcm, tm));
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode3, pcm, tm));
   }
 
@@ -187,15 +187,15 @@ public class TestPlacementConstraintsUtil {
     FiCaSchedulerNode schedulerNode3 = TestUtils
         .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
     // 'zk' placement on Rack1 should now SUCCEED
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode0, pcm, tm));
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode1, pcm, tm));
 
     // FAIL on the rest of the RACKs
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode2, pcm, tm));
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode3, pcm, tm));
   }
 
@@ -230,14 +230,14 @@ public class TestPlacementConstraintsUtil {
     tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
 
     // 'spark' placement on Node0 should now FAIL
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode0, pcm, tm));
     // SUCCEED on the rest of the nodes
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode1, pcm, tm));
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode2, pcm, tm));
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag1, schedulerNode3, pcm, tm));
   }
 
@@ -273,15 +273,15 @@ public class TestPlacementConstraintsUtil {
         .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
 
     // 'zk' placement on Rack1 should FAIL
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode0, pcm, tm));
-    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode1, pcm, tm));
 
     // SUCCEED on the rest of the RACKs
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode2, pcm, tm));
-    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfySingleConstraint(appId1,
         sourceTag2, schedulerNode3, pcm, tm));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 5f29186..b998564 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -192,7 +192,7 @@ public class FairSchedulerTestBase {
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
 
-    scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+    scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
     scheduler.update();
     return id;
@@ -222,7 +222,7 @@ public class FairSchedulerTestBase {
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
 
-    scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
+    scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
     return id;
   }
@@ -245,7 +245,7 @@ public class FairSchedulerTestBase {
       ResourceRequest request, ApplicationAttemptId attId) {
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ask.add(request);
-    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(),
+    scheduler.allocate(attId, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
     scheduler.update();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index 95dbaea..2512787 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -125,7 +125,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     List<ResourceRequest> ask = new ArrayList<>();
     ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
     scheduler.allocate(
-        appAttemptId, ask, new ArrayList<ContainerId>(),
+        appAttemptId, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
     FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
 
@@ -163,8 +163,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     ResourceRequest request =
         createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
     ask.add(request);
-    scheduler.allocate(appAttemptId, ask,
-        new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
     FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -175,8 +174,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
         createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
     ask.clear();
     ask.add(request);
-    scheduler.allocate(appAttemptId, ask,
-        new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
     checkAppConsumption(app, Resources.createResource(2048,2));
@@ -373,7 +371,7 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
         true);
     ask1.add(request1);
     ask1.add(request2);
-    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
+    scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null, null,
         NULL_UPDATE_REQUESTS);
 
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 77b6d04..d9c06a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -1280,7 +1280,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
     asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
 
-    scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
+    scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
             null, NULL_UPDATE_REQUESTS);
 
     ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
@@ -2125,7 +2125,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ResourceRequest request1 = createResourceRequest(minReqSize * 2,
         ResourceRequest.ANY, 1, 1, true);
     ask1.add(request1);
-    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(),
+    scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
 
     // Second ask, queue2 requests 1 large.
@@ -2141,7 +2141,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         ResourceRequest.ANY, 1, 1, false);
     ask2.add(request2);
     ask2.add(request3);
-    scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(),
+    scheduler.allocate(id21, ask2, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
 
     // Third ask, queue2 requests 2 small (minReqSize).
@@ -2157,7 +2157,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         ResourceRequest.ANY, 2, 2, true);
     ask3.add(request4);
     ask3.add(request5);
-    scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(),
+    scheduler.allocate(id22, ask3, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
 
     scheduler.update();
@@ -2683,7 +2683,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Complete the first container so we can trigger allocation for app2
     ContainerId containerId =
         app1.getLiveContainers().iterator().next().getContainerId();
-    scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(),
+    scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), null,
         Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS);
 
     // Trigger allocation for app2
@@ -2769,7 +2769,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
     asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
 
-    scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
+    scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
         null, NULL_UPDATE_REQUESTS);
     
     // node 1 checks in
@@ -3216,7 +3216,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         createResourceRequest(1024, node1.getHostName(), 1, 0, true),
         createResourceRequest(1024, "rack1", 1, 0, true),
         createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
-    scheduler.allocate(attId1, update, new ArrayList<ContainerId>(),
+    scheduler.allocate(attId1, update, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
     
     // then node2 should get the container
@@ -4432,7 +4432,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true);
 
     ask1.add(request1);
-    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null,
+    scheduler.allocate(id11, ask1, null, new ArrayList<ContainerId>(), null,
         null, NULL_UPDATE_REQUESTS);
 
     String hostName = "127.0.0.1";
@@ -4508,11 +4508,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Verify the blacklist can be updated independent of requesting containers
     scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
-        Collections.<ContainerId>emptyList(),
+        null, Collections.<ContainerId>emptyList(),
         Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     assertTrue(app.isPlaceBlacklisted(host));
     scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
-        Collections.<ContainerId>emptyList(), null,
+        null, Collections.<ContainerId>emptyList(), null,
         Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     assertFalse(scheduler.getSchedulerApp(appAttemptId)
         .isPlaceBlacklisted(host));
@@ -4521,8 +4521,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         createResourceRequest(GB, node.getHostName(), 1, 0, true));
 
     // Verify a container does not actually get placed on the blacklisted host
-    scheduler.allocate(appAttemptId, update,
-        Collections.<ContainerId>emptyList(),
+    scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(),
         Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     assertTrue(app.isPlaceBlacklisted(host));
     scheduler.update();
@@ -4531,8 +4530,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .getLiveContainers().size());
 
     // Verify a container gets placed on the empty blacklist
-    scheduler.allocate(appAttemptId, update,
-        Collections.<ContainerId>emptyList(), null,
+    scheduler.allocate(appAttemptId, update, null, Collections.<ContainerId>emptyList(), null,
         Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     assertFalse(app.isPlaceBlacklisted(host));
     createSchedulingRequest(GB, "root.default", "user", 1);
@@ -5391,8 +5389,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ask1.add(request3);
 
     // Perform allocation
-    scheduler.allocate(appAttemptId, ask1, new ArrayList<ContainerId>(), null,
-        null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(appAttemptId, ask1, null, new ArrayList<ContainerId>(),
+        null, null, NULL_UPDATE_REQUESTS);
     scheduler.update();
     scheduler.handle(new NodeUpdateSchedulerEvent(node));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index db749ac..8814c0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -281,7 +281,7 @@ public class TestFifoScheduler {
     ask.add(nodeLocal);
     ask.add(rackLocal);
     ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
 
     NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
@@ -378,7 +378,7 @@ public class TestFifoScheduler {
     ask.add(nodeLocal);
     ask.add(rackLocal);
     ask.add(any);
-    scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(),
         null, null, NULL_UPDATE_REQUESTS);
 
     // Before the node update event, there are one local request
@@ -954,7 +954,7 @@ public class TestFifoScheduler {
     ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1,
         RMNodeLabelsManager.NO_LABEL));
-    fs.allocate(appAttemptId1, ask1, emptyId,
+    fs.allocate(appAttemptId1, ask1, null, emptyId,
         Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS);
 
     // Trigger container assignment
@@ -963,7 +963,7 @@ public class TestFifoScheduler {
     // Get the allocation for the application and verify no allocation on
     // blacklist node
     Allocation allocation1 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
 
     Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
@@ -971,7 +971,7 @@ public class TestFifoScheduler {
     // verify host_1_1 can get allocated as not in blacklist
     fs.handle(new NodeUpdateSchedulerEvent(n4));
     Allocation allocation2 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
     List<Container> containerList = allocation2.getContainers();
@@ -986,33 +986,33 @@ public class TestFifoScheduler {
     // be assigned
     ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
-    fs.allocate(appAttemptId1, ask2, emptyId,
+    fs.allocate(appAttemptId1, ask2, null, emptyId,
         Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS);
 
     // verify n1 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n1));
     Allocation allocation3 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
 
     // verify n2 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n2));
     Allocation allocation4 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
 
     // verify n3 is not qualified to be allocated
     fs.handle(new NodeUpdateSchedulerEvent(n3));
     Allocation allocation5 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
 
     fs.handle(new NodeUpdateSchedulerEvent(n4));
     Allocation allocation6 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
 
@@ -1072,14 +1072,14 @@ public class TestFifoScheduler {
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
-    fs.allocate(appAttemptId1, ask1, emptyId,
+    fs.allocate(appAttemptId1, ask1, null, emptyId,
         null, null, NULL_UPDATE_REQUESTS);
 
     // Ask for a 2 GB container for app 2
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
     ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
         ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
-    fs.allocate(appAttemptId2, ask2, emptyId,
+    fs.allocate(appAttemptId2, ask2, null, emptyId,
         null, null, NULL_UPDATE_REQUESTS);
 
     // Trigger container assignment
@@ -1087,13 +1087,13 @@ public class TestFifoScheduler {
 
     // Get the allocation for the applications and verify headroom
     Allocation allocation1 =
-        fs.allocate(appAttemptId1, emptyAsk, emptyId,
+        fs.allocate(appAttemptId1, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("Allocation headroom", 1 * GB, allocation1
         .getResourceLimit().getMemorySize());
 
     Allocation allocation2 =
-        fs.allocate(appAttemptId2, emptyAsk, emptyId,
+        fs.allocate(appAttemptId2, emptyAsk, null, emptyId,
             null, null, NULL_UPDATE_REQUESTS);
     Assert.assertEquals("Allocation headroom", 1 * GB, allocation2
         .getResourceLimit().getMemorySize());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
new file mode 100644
index 0000000..479d2c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.function.LongBinaryOperator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviors of single constraint app placement allocator.
+ */
+public class TestSingleConstraintAppPlacementAllocator {
+  private AppSchedulingInfo appSchedulingInfo;
+  private AllocationTagsManager spyAllocationTagsManager;
+  private RMContext rmContext;
+  private SchedulerRequestKey schedulerRequestKey;
+  private SingleConstraintAppPlacementAllocator allocator;
+
+  @Before
+  public void setup() throws Exception {
+    // stub app scheduling info.
+    appSchedulingInfo = mock(AppSchedulingInfo.class);
+    when(appSchedulingInfo.getApplicationId()).thenReturn(
+        TestUtils.getMockApplicationId(1));
+    when(appSchedulingInfo.getApplicationAttemptId()).thenReturn(
+        TestUtils.getMockApplicationAttemptId(1, 1));
+
+    // stub RMContext
+    rmContext = TestUtils.getMockRMContext();
+
+    // Create allocation tags manager
+    AllocationTagsManager allocationTagsManager = new AllocationTagsManager(
+        rmContext);
+    spyAllocationTagsManager = spy(allocationTagsManager);
+    schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L,
+        TestUtils.getMockContainerId(1, 1));
+    rmContext.setAllocationTagsManager(spyAllocationTagsManager);
+
+    // Create allocator
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+  }
+
+  private void assertValidSchedulingRequest(
+      SchedulingRequest schedulingRequest) {
+    // Create allocator to avoid fields polluted by previous runs
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+  }
+
+  private void assertInvalidSchedulingRequest(
+      SchedulingRequest schedulingRequest, boolean recreateAllocator) {
+    try {
+      // Create allocator
+      if (recreateAllocator) {
+        allocator = new SingleConstraintAppPlacementAllocator();
+        allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+      }
+      allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+    } catch (SchedulerInvalidResoureRequestException e) {
+      // Expected
+      return;
+    }
+    Assert.fail(
+        "Expect failure for schedulingRequest=" + schedulingRequest.toString());
+  }
+
+  @Test
+  public void testSchedulingRequestValidation() {
+    // Valid
+    assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build());
+    Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+        allocator.getTargetAllocationTags());
+    Assert.assertEquals("", allocator.getTargetNodePartition());
+
+    // Valid (with partition)
+    assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition("x"))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build());
+    Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+        allocator.getTargetAllocationTags());
+    Assert.assertEquals("x", allocator.getTargetNodePartition());
+
+    // Valid (without specifying node partition)
+    assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer")).build())
+        .resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build());
+    Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+        allocator.getTargetAllocationTags());
+    Assert.assertEquals("", allocator.getTargetNodePartition());
+
+    // Valid (with application Id target)
+    assertValidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer")).build())
+        .resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build());
+    // Allocation tags should not include application Id
+    Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
+        allocator.getTargetAllocationTags());
+    Assert.assertEquals("", allocator.getTargetNodePartition());
+
+    // Invalid (without sizing)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer")).build())
+        .build(), true);
+
+    // Invalid (without target tags)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1).build())
+        .build(), true);
+
+    // Invalid (with multiple allocation tags expression specified)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper"),
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+
+    // Invalid (with multiple node partition target expression specified)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper"),
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp(""),
+                PlacementConstraints.PlacementTargets.nodePartition("x"))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+
+    // Invalid (not anti-affinity cardinality)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 1, 2,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+
+    // Invalid (not anti-affinity cardinality)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 2,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+
+    // Invalid (not NODE scope)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.RACK, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+
+    // Invalid (not GUARANTEED)
+    assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build(), true);
+  }
+
+  @Test
+  public void testSchedulingRequestUpdate() {
+    SchedulingRequest schedulingRequest =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(PlacementConstraints
+                .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                    PlacementConstraints.PlacementTargets
+                        .allocationTagToIntraApp("mapper", "reducer"),
+                    PlacementConstraints.PlacementTargets.nodePartition(""))
+                .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+    // Update allocator with exactly same scheduling request, should succeeded.
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+    // Update allocator with scheduling request different at #allocations,
+    // should succeeded.
+    schedulingRequest.getResourceSizing().setNumAllocations(10);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+
+    // Update allocator with scheduling request different at resource,
+    // should failed.
+    schedulingRequest.getResourceSizing().setResources(
+        Resource.newInstance(2048, 1));
+    assertInvalidSchedulingRequest(schedulingRequest, false);
+
+    // Update allocator with a different placement target (allocator tag),
+    // should failed
+    schedulingRequest = SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build();
+    assertInvalidSchedulingRequest(schedulingRequest, false);
+
+    // Update allocator with recover == true
+    int existingNumAllocations =
+        allocator.getSchedulingRequest().getResourceSizing()
+            .getNumAllocations();
+    schedulingRequest = SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition(""))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build();
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, true);
+    Assert.assertEquals(existingNumAllocations + 1,
+        allocator.getSchedulingRequest().getResourceSizing()
+            .getNumAllocations());
+  }
+
+  @Test
+  public void testFunctionality() throws InvalidAllocationTagsQueryException {
+    SchedulingRequest schedulingRequest =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(PlacementConstraints
+                .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                    PlacementConstraints.PlacementTargets
+                        .allocationTagToIntraApp("mapper", "reducer"),
+                    PlacementConstraints.PlacementTargets.nodePartition(""))
+                .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+    allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+    verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
+        eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)),
+        eq(ImmutableSet.of("mapper", "reducer")),
+        any(LongBinaryOperator.class));
+
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    // Valid (with partition)
+    schedulingRequest = SchedulingRequest.newBuilder().executionType(
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+        .allocationRequestId(10L).priority(Priority.newInstance(1))
+        .placementConstraintExpression(PlacementConstraints
+            .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                PlacementConstraints.PlacementTargets
+                    .allocationTagToIntraApp("mapper", "reducer"),
+                PlacementConstraints.PlacementTargets.nodePartition("x"))
+            .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+        .build();
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+    allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNode("host1", "/rack1", 123, 1024));
+    verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
+        eq(NodeId.fromString("host1:123")),
+        eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet
+            .of("mapper", "reducer")), any(LongBinaryOperator.class));
+
+    SchedulerNode node1 = mock(SchedulerNode.class);
+    when(node1.getPartition()).thenReturn("x");
+    when(node1.getNodeID()).thenReturn(NodeId.fromString("host1:123"));
+
+    Assert.assertTrue(allocator
+        .precheckNode(node1, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+
+    SchedulerNode node2 = mock(SchedulerNode.class);
+    when(node1.getPartition()).thenReturn("");
+    when(node1.getNodeID()).thenReturn(NodeId.fromString("host2:123"));
+    Assert.assertFalse(allocator
+        .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message