hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [10/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
Date Fri, 11 Nov 2016 00:10:37 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 51b567b..8694efb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -196,6 +199,7 @@ public class TestLeafQueue {
 
     cs.setRMContext(spyRMContext);
     cs.init(csConf);
+    cs.setResourceCalculator(rC);
     cs.start();
 
     when(spyRMContext.getScheduler()).thenReturn(cs);
@@ -268,6 +272,12 @@ public class TestLeafQueue {
         any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), 
         any(RMContainer.class), any(ContainerStatus.class), 
         any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
+
+    // Stub out parent queue's accept and apply.
+    doReturn(true).when(parent).accept(any(Resource.class),
+        any(ResourceCommitRequest.class));
+    doNothing().when(parent).apply(any(Resource.class),
+        any(ResourceCommitRequest.class));
     
     return queue;
   }
@@ -339,6 +349,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
         8*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
+
     final int numNodes = 1;
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -353,8 +369,10 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(
         (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -526,6 +544,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
         8*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
+
     final int numNodes = 1;
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -544,8 +568,10 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(1*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -555,8 +581,10 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -564,8 +592,10 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Can't allocate 3rd due to user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -574,8 +604,10 @@ public class TestLeafQueue {
     
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(3*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -583,8 +615,10 @@ public class TestLeafQueue {
     assertEquals(3*GB, a.getMetrics().getAllocatedMB());
 
     // One more should work, for app_1, due to user-limit-factor
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(4*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -594,8 +628,10 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(4*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -687,6 +723,13 @@ public class TestLeafQueue {
         assign.getResource().getMemorySize() > 0);
   }
 
+  private void applyCSAssignment(Resource clusterResource, CSAssignment assign,
+      LeafQueue q, final Map<NodeId, FiCaSchedulerNode> nodes,
+      final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
+      throws IOException {
+    TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps);
+  }
+
   @Test
   public void testDRFUserLimits() throws Exception {
     setUpWithDominantResourceCalculator();
@@ -723,6 +766,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node1 =
         TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
 
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
+        node0, node1.getNodeID(), node1);
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app0.getApplicationAttemptId(), app0, app2.getApplicationAttemptId(),
+        app2);
+
     int numNodes = 2;
     Resource clusterResource =
         Resources.createResource(numNodes * (8 * GB), numNodes * 100);
@@ -758,12 +807,14 @@ public class TestLeafQueue {
           b.assignContainers(clusterResource, node0, new ResourceLimits(
               clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       LOG.info(assign.toString());
+      applyCSAssignment(clusterResource, assign, b, nodes, apps);
     } while (assign.getResource().getMemorySize() > 0 &&
         assign.getAssignmentInformation().getNumReservations() == 0);
     do {
       assign =
           b.assignContainers(clusterResource, node1, new ResourceLimits(
               clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      applyCSAssignment(clusterResource, assign, b, nodes, apps);
     } while (assign.getResource().getMemorySize() > 0 &&
         assign.getAssignmentInformation().getNumReservations() == 0);
     //LOG.info("user_0: " + queueUser0.getUsed());
@@ -847,6 +898,12 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
             priority, recordFactory)));
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
+
     /**
      * Start testing...
      */
@@ -859,8 +916,10 @@ public class TestLeafQueue {
     assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(3*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -868,8 +927,10 @@ public class TestLeafQueue {
     // Allocate one container to app_1. Even if app_0
     // submit earlier, it cannot get this container assigned since user_0
     // exceeded user-limit already. 
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(4*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -877,8 +938,10 @@ public class TestLeafQueue {
     // Allocate one container to app_0, before allocating this container,
     // user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
     // user-limit.
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(7*GB, a.getUsedResources().getMemorySize());
     assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -890,7 +953,7 @@ public class TestLeafQueue {
   }
 
   @Test
-  public void testComputeUserLimitAndSetHeadroom(){
+  public void testComputeUserLimitAndSetHeadroom() throws IOException {
     LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
     qb.setMaxCapacity(1.0f);
     // Users
@@ -903,6 +966,9 @@ public class TestLeafQueue {
     String host_1 = "127.0.0.2";
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
 
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
+
     final int numNodes = 2;
     Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
@@ -925,6 +991,8 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
             qb.getActiveUsersManager(), spyRMContext);
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
+    apps.put(app_0.getApplicationAttemptId(), app_0);
     qb.submitApplicationAttempt(app_0, user_0);
     Priority u0Priority = TestUtils.createMockPriority(1);
     SchedulerRequestKey u0SchedKey = toSchedulerKey(u0Priority);
@@ -935,8 +1003,10 @@ public class TestLeafQueue {
     assertEquals("There should only be 1 active user!",
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
-    qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        qb.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
         "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
@@ -949,14 +1019,17 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
             qb.getActiveUsersManager(), spyRMContext);
+    apps.put(app_2.getApplicationAttemptId(), app_2);
     Priority u1Priority = TestUtils.createMockPriority(2);
     SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
-    qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        qb.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
         "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
@@ -984,11 +1057,13 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
             qb.getActiveUsersManager(), spyRMContext);
+    apps.put(app_1.getApplicationAttemptId(), app_1);
     final ApplicationAttemptId appAttemptId_3 =
         TestUtils.getMockApplicationAttemptId(3, 0);
     FiCaSchedulerApp app_3 =
         new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
             qb.getActiveUsersManager(), spyRMContext);
+    apps.put(app_3.getApplicationAttemptId(), app_3);
     app_1.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
             u0Priority, recordFactory)));
@@ -997,10 +1072,14 @@ public class TestLeafQueue {
              u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
-    qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        qb.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
+    applyCSAssignment(clusterResource,
+        qb.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
         "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, qb.getUsedResources().getMemorySize());
@@ -1013,12 +1092,15 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_4 =
               new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
                       qb.getActiveUsersManager(), spyRMContext);
+    apps.put(app_4.getApplicationAttemptId(), app_4);
     qb.submitApplicationAttempt(app_4, user_0);
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
-    qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        qb.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
         "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
@@ -1078,6 +1160,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 
       0, 16*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1, app_2.getApplicationAttemptId(), app_2);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
+
     final int numNodes = 2;
     Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
@@ -1088,8 +1176,10 @@ public class TestLeafQueue {
             TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
                 priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(1*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1105,8 +1195,10 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
             priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1165,6 +1257,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
     String host_1 = "127.0.0.2";
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1, app_2.getApplicationAttemptId(), app_2);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
     
     final int numNodes = 2;
     Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
@@ -1194,8 +1292,10 @@ public class TestLeafQueue {
         1, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1206,8 +1306,10 @@ public class TestLeafQueue {
       // the application is not yet active
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(3*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1223,8 +1325,10 @@ public class TestLeafQueue {
 
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(3*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1237,8 +1341,10 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(0*GB, app_2.getHeadroom().getMemorySize());   // hit queue max-cap
   }
 
@@ -1283,10 +1389,17 @@ public class TestLeafQueue {
         new FiCaSchedulerApp(appAttemptId_3, user_2, a, 
             a.getActiveUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_3, user_2);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1, app_2.getApplicationAttemptId(), app_2,
+        app_3.getApplicationAttemptId(), app_3);
     
     // Setup some nodes
     String host_0 = "127.0.0.1";
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
     
     final int numNodes = 1;
     Resource clusterResource = 
@@ -1308,24 +1421,30 @@ public class TestLeafQueue {
      */
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(1*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1343,8 +1462,10 @@ public class TestLeafQueue {
     // Now allocations should goto app_2 since 
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(5*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1353,8 +1474,10 @@ public class TestLeafQueue {
 
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(6*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1364,8 +1487,10 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(6*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1376,8 +1501,10 @@ public class TestLeafQueue {
     // Now, allocations should goto app_3 since it's under user-limit 
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(7*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1385,8 +1512,10 @@ public class TestLeafQueue {
     assertEquals(1*GB, app_3.getCurrentConsumption().getMemorySize());
 
     // Now we should assign to app_3 again since user_2 is under user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(8*GB, a.getUsedResources().getMemorySize());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1466,6 +1595,12 @@ public class TestLeafQueue {
     // Setup some nodes
     String host_0 = "127.0.0.1";
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
     
     final int numNodes = 2;
     Resource clusterResource = 
@@ -1485,8 +1620,10 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(1*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1496,8 +1633,10 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1505,8 +1644,10 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(6*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1522,8 +1663,10 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(5*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1539,8 +1682,10 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(4*GB, a.getUsedResources().getMemorySize());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1587,6 +1732,12 @@ public class TestLeafQueue {
     
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
     
     final int numNodes = 3;
     Resource clusterResource = 
@@ -1613,23 +1764,29 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(1*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(2*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(6*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1643,8 +1800,10 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(5*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1653,8 +1812,10 @@ public class TestLeafQueue {
     assertEquals(1, app_1.getReReservations(toSchedulerKey(priority)));
 
     // Re-reserve
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(5*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1663,8 +1824,10 @@ public class TestLeafQueue {
     assertEquals(2, app_1.getReReservations(toSchedulerKey(priority)));
     
     // Try to schedule on node_1 now, should *move* the reservation
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(9*GB, a.getUsedResources().getMemorySize());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1681,8 +1844,10 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(4*GB, a.getUsedResources().getMemorySize());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -1735,6 +1900,15 @@ public class TestLeafQueue {
     String rack_2 = "rack_2";
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
 
+    String host_3 = "127.0.0.4"; // on rack_1
+    FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2,
+        node_3.getNodeID(), node_3);
+
     final int numNodes = 3;
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -1767,6 +1941,7 @@ public class TestLeafQueue {
     // Start with off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
@@ -1775,6 +1950,7 @@ public class TestLeafQueue {
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
@@ -1783,6 +1959,7 @@ public class TestLeafQueue {
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
@@ -1792,6 +1969,7 @@ public class TestLeafQueue {
     // since missedOpportunities=3 and reqdContainers=3
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     // should NOT reset
     assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1800,6 +1978,7 @@ public class TestLeafQueue {
     // NODE_LOCAL - node_0
     assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1808,6 +1987,7 @@ public class TestLeafQueue {
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1828,9 +2008,6 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     assertEquals(4, app_0.getTotalRequiredResources(schedulerKey));
     
-    String host_3 = "127.0.0.4"; // on rack_1
-    FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
-    
     // Rack-delay
     doReturn(true).when(a).getRackLocalityFullReset();
     doReturn(1).when(a).getNodeLocalityDelay();
@@ -1838,12 +2015,14 @@ public class TestLeafQueue {
     // Shouldn't assign RACK_LOCAL yet
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(4, app_0.getTotalRequiredResources(schedulerKey));
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1852,6 +2031,7 @@ public class TestLeafQueue {
     // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
 
@@ -1861,6 +2041,7 @@ public class TestLeafQueue {
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should NOT reset
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1869,6 +2050,7 @@ public class TestLeafQueue {
     // Another RACK_LOCAL since schedulingOpportunities not reset
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     // should NOT reset
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
@@ -1894,12 +2076,14 @@ public class TestLeafQueue {
       assignment =
           a.assignContainers(clusterResource, node_2, new ResourceLimits(
               clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      applyCSAssignment(clusterResource, assignment, a, nodes, apps);
       verifyNoContainerAllocated(assignment);
       assertEquals(i+1, app_0.getSchedulingOpportunities(schedulerKey));
     }
     // delay should be capped at numNodes so next one should allocate
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
   }
@@ -1933,6 +2117,11 @@ public class TestLeafQueue {
     String rack_2 = "rack_2";
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     final int numNodes = 3;
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), 1);
@@ -1981,6 +2170,7 @@ public class TestLeafQueue {
     // thus, no P2 either!
     CSAssignment assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey1));
     assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1));
@@ -1991,6 +2181,7 @@ public class TestLeafQueue {
     // thus, no P2 either!
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey1));
     assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1));
@@ -2000,6 +2191,7 @@ public class TestLeafQueue {
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey1));
     assertEquals(1, app_0.getTotalRequiredResources(schedulerKey1));
@@ -2009,6 +2201,7 @@ public class TestLeafQueue {
     // Now, DATA_LOCAL for P1
     assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
     assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1));
@@ -2018,6 +2211,7 @@ public class TestLeafQueue {
     // Now, OFF_SWITCH for P2
     assignment = a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
     assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1));
@@ -2054,6 +2248,12 @@ public class TestLeafQueue {
     String host_1_0 = "127.0.0.3";
     String rack_1 = "rack_1";
     FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
+        node_0_0, node_0_1.getNodeID(), node_0_1, node_1_0.getNodeID(),
+        node_1_0);
     
     final int numNodes = 3;
     Resource clusterResource = Resources.createResource(
@@ -2093,6 +2293,7 @@ public class TestLeafQueue {
     // NODE_LOCAL - node_0_1
     CSAssignment assignment = a.assignContainers(clusterResource, node_0_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
     // should reset
@@ -2102,6 +2303,7 @@ public class TestLeafQueue {
     // required(ANY) == 0
     assignment = a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     // Still zero
     // since #req=0
@@ -2119,6 +2321,7 @@ public class TestLeafQueue {
     // required(rack_1) == 0
     assignment = a.assignContainers(clusterResource, node_0_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
@@ -2126,6 +2329,7 @@ public class TestLeafQueue {
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -2319,18 +2523,25 @@ public class TestLeafQueue {
             mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+
     // Setup some nodes and racks
     String host_0_0 = "127.0.0.1";
     String rack_0 = "rack_0";
     String host_0_1 = "127.0.0.2";
     FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
-    
-    
+
     String host_1_0 = "127.0.0.3";
     String rack_1 = "rack_1";
     FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
     String host_1_1 = "127.0.0.4";
     FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB);
+
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_1.getNodeID(),
+        node_0_1, node_1_0.getNodeID(), node_1_0, node_1_1.getNodeID(),
+        node_1_1);
     
     final int numNodes = 4;
     Resource clusterResource = Resources.createResource(
@@ -2380,6 +2591,7 @@ public class TestLeafQueue {
     CSAssignment assignment =
         a.assignContainers(clusterResource, node_0_1, new ResourceLimits(
             clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     // should be 0
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -2403,6 +2615,7 @@ public class TestLeafQueue {
     // Shouldn't allocate since RR(rack_1) = relax: false
     assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     // should be 0
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -2434,6 +2647,7 @@ public class TestLeafQueue {
     // Shouldn't allocate since node_1_1 is blacklisted
     assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     // should be 0
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -2461,8 +2675,10 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
-    assignment = a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    assignment = a.assignContainers(clusterResource, node_1_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     // should be 0
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
@@ -2490,6 +2706,7 @@ public class TestLeafQueue {
     // Now, should allocate since RR(rack_1) = relax: true
     assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(1, app_0.getTotalRequiredResources(schedulerKey));
@@ -2520,6 +2737,7 @@ public class TestLeafQueue {
 
     assignment = a.assignContainers(clusterResource, node_1_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(0, app_0.getTotalRequiredResources(schedulerKey));
@@ -2590,6 +2808,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 =
         TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
+
     final int numNodes = 1;
     Resource clusterResource =
         Resources.createResource(numNodes * (8 * GB), numNodes * 16);
@@ -2603,91 +2827,111 @@ public class TestLeafQueue {
             recordFactory)));
 
     try {
-      a.assignContainers(clusterResource, node_0, 
-          new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      applyCSAssignment(clusterResource,
+          a.assignContainers(clusterResource, node_0,
+          new ResourceLimits(clusterResource),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");
     }
   }
-  
-    @Test
+
+  @Test
   public void testFifoAssignment() throws Exception {
 
-    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
-    
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+
     a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
 
     String host_0_0 = "127.0.0.1";
     String rack_0 = "rack_0";
-    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
-    
+    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0,
+        16 * GB);
+
     final int numNodes = 4;
-    Resource clusterResource = Resources.createResource(
-        numNodes * (16*GB), numNodes * 16);
+    Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
+        numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
     String user_0 = "user_0";
-    
-    final ApplicationAttemptId appAttemptId_0 = 
-        TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext,
-            Priority.newInstance(3), false));
+
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
+        false));
     a.submitApplicationAttempt(app_0, user_0);
-    
-    final ApplicationAttemptId appAttemptId_1 = 
-        TestUtils.getMockApplicationAttemptId(1, 0); 
-    FiCaSchedulerApp app_1 = 
-        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext,
-            Priority.newInstance(5), false));
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
+        false));
     a.submitApplicationAttempt(app_1, user_0);
- 
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
+        node_0_0);
+
     Priority priority = TestUtils.createMockPriority(1);
     List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
     List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
-    
+
     app_0_requests_0.clear();
-    app_0_requests_0.add(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, 
-            true, priority, recordFactory));
+    app_0_requests_0.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
+            recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
-    
+
     app_1_requests_0.clear();
-    app_1_requests_0.add(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
-            true, priority, recordFactory));
+    app_1_requests_0.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory));
     app_1.updateResourceRequests(app_1_requests_0);
 
     // app_1 will get containers as it has high priority
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize());
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
 
     app_0_requests_0.clear();
-    app_0_requests_0.add(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
-            true, priority, recordFactory));
+    app_0_requests_0.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
 
     app_1_requests_0.clear();
-    app_1_requests_0.add(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
-            true, priority, recordFactory));
+    app_1_requests_0.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory));
     app_1.updateResourceRequests(app_1_requests_0);
 
     //app_1 will still get assigned first as priority is more.
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
-    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
+    Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
 
     //and only then will app_2
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
-}
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
+  }
+
   @Test
   public void testConcurrentAccess() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
@@ -2792,6 +3036,12 @@ public class TestLeafQueue {
             mock(ActiveUsersManager.class), spyRMContext));
     a.submitApplicationAttempt(app_1, user_0);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
+        node_0_0);
+
     Priority priority = TestUtils.createMockPriority(1);
     List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
     List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
@@ -2809,9 +3059,15 @@ public class TestLeafQueue {
     app_1.updateResourceRequests(app_1_requests_0);
 
     // app_0 will get containers as its submitted first.
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
 
     app_0_requests_0.clear();
@@ -2828,12 +3084,18 @@ public class TestLeafQueue {
 
     //Since it already has more resources, app_0 will not get
     //assigned first, but app_1 will
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
 
     //and only then will app_0
-    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
 
   }
@@ -2874,6 +3136,12 @@ public class TestLeafQueue {
     String rack_2 = "rack_2";
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     final int numNodes = 3;
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -2922,6 +3190,7 @@ public class TestLeafQueue {
     // Check app_0's scheduling opportunities increased and app_1 get allocated
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
     verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
     assertEquals(3, app_0.getTotalRequiredResources(schedulerKey));
@@ -2965,6 +3234,12 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
         100*GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
+
     final int numNodes = 1;
     Resource clusterResource =
         Resources.createResource(numNodes * (100*GB), numNodes * 128);
@@ -2983,9 +3258,10 @@ public class TestLeafQueue {
     // Start testing...
 
     // Assign 1st Container of 1GB
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
     // all users (only user_0) queue 'e' should be able to consume 1GB.
     // The first container should be assigned to app_0 with no headroom left
@@ -2996,9 +3272,10 @@ public class TestLeafQueue {
                    clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
 
     // Assign 2nd container of 1GB
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // user_0 has no headroom due to user-limit-factor of 1.0. However capacity
     // scheduler will assign one container more than user-limit-factor.
     // This container also went to app_0. Still with no neadroom even though
@@ -3009,9 +3286,10 @@ public class TestLeafQueue {
         clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
 
     // Can't allocate 3rd container due to user-limit. Headroom still 0.
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3025,9 +3303,10 @@ public class TestLeafQueue {
     assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit(
         clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
 
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // app_0 is now satisified, app_1 is still asking for 2GB.
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@@ -3035,12 +3314,14 @@ public class TestLeafQueue {
         clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
 
     // Get the last 2 containers for app_1, no more pending requests.
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3112,10 +3393,17 @@ public class TestLeafQueue {
             mock(ActiveUsersManager.class), spyRMContext);
     e.submitApplicationAttempt(app_3, user_1);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1, app_2.getApplicationAttemptId(), app_2,
+        app_3.getApplicationAttemptId(), app_3);
+
     // Setup 1 node with 100GB of memory resources.
     String host_0 = "127.0.0.1";
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
         100*GB);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0);
 
     final int numNodes = 1;
     Resource clusterResource =
@@ -3156,9 +3444,10 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
 
     // Assign 1st Container of 1GB
-    e.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // The first container was assigned to user_0's app_0. Queues total headroom
     // has 1GB left for user_1.
     assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3171,9 +3460,10 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
 
     // Assign 2nd container of 1GB
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // user_0 has no headroom due to user-limit-factor of 1.0. However capacity
     // scheduler will assign one container more than user-limit-factor. So,
     // this container went to user_0's app_1. so, headroom for queue 'e'e is
@@ -3188,9 +3478,10 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
 
     // Assign 3rd container.
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // Container was allocated to user_1's app_2 since user_1, Now, no headroom
     // is left.
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3203,9 +3494,10 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
 
     // Assign 4th container.
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // Allocated to user_1's app_2 since scheduler allocates 1 container
     // above user resource limit. Available headroom still 0.
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3222,9 +3514,10 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_3_consumption);
 
     // Attempt to assign 5th container. Will be a no-op.
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // Cannot allocate 5th container because both users are above their allowed
     // user resource limit. Values should be the same as previously.
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
@@ -3241,9 +3534,10 @@ public class TestLeafQueue {
     // factor is no longer the limiting factor.
     e.setUserLimitFactor(10.0f);
 
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // Next container goes to user_0's app_1, since it still wanted 1GB.
     assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
         clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
@@ -3254,9 +3548,10 @@ public class TestLeafQueue {
     assertEquals(2*GB, app_2.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
 
-    e.assignContainers(clusterResource, node_0,
+    applyCSAssignment(clusterResource,
+        e.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
     // Last container goes to user_1's app_3, since it still wanted 1GB.
     // user_0's apps:
     assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message