hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [1/7] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
Date Mon, 07 Nov 2016 18:41:26 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk acd509dc5 -> de3b4aac5


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 42a8872..d875969 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -49,8 +49,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -123,6 +127,27 @@ public class TestParentQueue {
     return application;
   }
 
+  private void applyAllocationToQueue(Resource clusterResource,
+      int allocatedMem,
+      CSQueue queue) {
+    // Call accept & apply for queue
+    ResourceCommitRequest request = mock(ResourceCommitRequest.class);
+    when(request.anythingAllocatedOrReserved()).thenReturn(true);
+    ContainerAllocationProposal allocation = mock(
+        ContainerAllocationProposal.class);
+    when(request.getTotalReleasedResource()).thenReturn(Resources.none());
+    when(request.getFirstAllocatedOrReservedContainer()).thenReturn(allocation);
+    SchedulerContainer scontainer = mock(SchedulerContainer.class);
+    when(allocation.getAllocatedOrReservedContainer()).thenReturn(scontainer);
+    when(allocation.getAllocatedOrReservedResource()).thenReturn(
+        Resources.createResource(allocatedMem));
+    when(scontainer.getNodePartition()).thenReturn("");
+
+    if (queue.accept(clusterResource, request)) {
+      queue.apply(clusterResource, request);
+    }
+  }
+
   private void stubQueueAllocation(final CSQueue queue, 
       final Resource clusterResource, final FiCaSchedulerNode node, 
       final int allocation) {
@@ -157,7 +182,7 @@ public class TestParentQueue {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
-              .assignContainers(eq(clusterResource), eq(node),
+              .assignContainers(eq(clusterResource), any(PlacementSet.class),
                   any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
@@ -168,7 +193,7 @@ public class TestParentQueue {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).when(queue).assignContainers(eq(clusterResource), eq(node),
+    }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
         any(ResourceLimits.class), any(SchedulingMode.class));
   }
   
@@ -205,8 +230,8 @@ public class TestParentQueue {
     setupSingleLevelQueues(csConf);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
+    CSQueue root =
+        CapacityScheduler.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT, queues, queues, 
             TestUtils.spyHook);
 
@@ -245,13 +270,18 @@ public class TestParentQueue {
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -261,11 +291,13 @@ public class TestParentQueue {
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -277,9 +309,9 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -287,13 +319,17 @@ public class TestParentQueue {
     // since A has 3/6G while B has 8/14G
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -430,9 +466,9 @@ public class TestParentQueue {
     setupMultiLevelQueues(csConf);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
-            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+    CSQueue root =
+        CapacityScheduler.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
     
     // Setup some nodes
@@ -496,6 +532,8 @@ public class TestParentQueue {
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
     root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyAllocationToQueue(clusterResource, 4*GB,
+        b);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -506,15 +544,27 @@ public class TestParentQueue {
     stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
+
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 1*GB, a);
+
+    root.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2*GB, root);
+
+    root.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2*GB, b);
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -533,17 +583,28 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
-    root.assignContainers(clusterResource, node_2, 
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2*GB, a);
+
+    root.assignContainers(clusterResource, node_2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2*GB, b);
+
+    root.assignContainers(clusterResource, node_2,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -614,27 +675,25 @@ public class TestParentQueue {
   public void testOffSwitchScheduling() throws Exception {
     // Setup queue configs
     setupSingleLevelQueues(csConf);
-    csConf.setOffSwitchPerHeartbeatLimit(2);
 
-    
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
-            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+    CSQueue root =
+        CapacityScheduler.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
 
     // Setup some nodes
     final int memoryPerNode = 10;
     final int coresPerNode = 16;
     final int numNodes = 2;
-    
-    FiCaSchedulerNode node_0 = 
+
+    FiCaSchedulerNode node_0 =
         TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
-    FiCaSchedulerNode node_1 = 
+    FiCaSchedulerNode node_1 =
         TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
-    
-    final Resource clusterResource = 
-        Resources.createResource(numNodes * (memoryPerNode*GB), 
+
+    final Resource clusterResource =
+        Resources.createResource(numNodes * (memoryPerNode*GB),
             numNodes * coresPerNode);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
@@ -644,50 +703,46 @@ public class TestParentQueue {
     a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
-    .incPending(Resources.createResource(1 * GB));
-    
-    // Simulate returning 2 containers on node_0 before offswitch limit
-    stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+        .incPending(Resources.createResource(1 * GB));
 
-    root.assignContainers(clusterResource, node_0, 
+    // Simulate B returning a container on node_0
+    stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
+    stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    InOrder allocationOrder = inOrder(a, b);
-    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    verifyQueueMetrics(a, 1*GB, clusterResource);
+    verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
-    
+
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
     // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, 
+    root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder = inOrder(a, b);
-    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    verifyQueueMetrics(a, 3*GB, clusterResource);
+    InOrder allocationOrder = inOrder(a);
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    allocationOrder = inOrder(b);
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
-    
-    // Now, B should get the scheduling opportunity 
-    // since A has 2/6G while B has 2/14G, 
-    // A also gets an opportunity because offswitchlimit not reached
+
+    // Now, B should get the scheduling opportunity
+    // since A has 2/6G while B has 2/14G,
+    // However, since B returns off-switch, A won't get an opportunity
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
-
-    root.assignContainers(clusterResource, node_0, 
+    root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
-    verifyQueueMetrics(a, 4*GB, clusterResource);
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
+    verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
   }
@@ -743,11 +798,13 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    root.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -760,9 +817,9 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+        any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 8fe85c9..f6caa50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -30,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -190,6 +194,15 @@ public class TestReservations {
   }
 
   static LeafQueue stubLeafQueue(LeafQueue queue) {
+    ParentQueue parent = (ParentQueue) queue.getParent();
+
+    if (parent != null) {
+      // Stub out parent queue's accept and apply.
+      doReturn(true).when(parent).accept(any(Resource.class),
+          any(ResourceCommitRequest.class));
+      doNothing().when(parent).apply(any(Resource.class),
+          any(ResourceCommitRequest.class));
+    }
     return queue;
   }
 
@@ -239,6 +252,12 @@ public class TestReservations {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
         8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
@@ -268,8 +287,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -280,8 +301,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -292,8 +315,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -308,8 +333,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -325,8 +352,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_2,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(18 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -343,8 +372,10 @@ public class TestReservations {
 
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(18 * GB, a.getUsedResources().getMemorySize());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -412,6 +443,12 @@ public class TestReservations {
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     cs.getNodeTracker().addNode(node_0);
     cs.getNodeTracker().addNode(node_1);
     cs.getNodeTracker().addNode(node_2);
@@ -434,8 +471,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize());
@@ -446,8 +485,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(4 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
@@ -467,8 +508,10 @@ public class TestReservations {
             priorityMap, recordFactory)));
 
     // add a reservation for app_0
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(12 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
@@ -481,8 +524,10 @@ public class TestReservations {
 
     // next assignment is beyond user limit for user_0 but it should assign to
     // app_1 for user_1
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(14 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(4 * GB, app_1.getCurrentConsumption().getMemorySize());
@@ -544,6 +589,12 @@ public class TestReservations {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
         8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
@@ -569,8 +620,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -581,8 +634,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -593,8 +648,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -609,8 +666,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -626,8 +685,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_2,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(18 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -644,8 +705,10 @@ public class TestReservations {
 
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(18 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -706,6 +769,12 @@ public class TestReservations {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
         8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
+
     cs.getNodeTracker().addNode(node_0);
     cs.getNodeTracker().addNode(node_1);
 
@@ -733,8 +802,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -744,8 +815,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -755,8 +828,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -770,8 +845,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -786,8 +863,10 @@ public class TestReservations {
         toSchedulerKey(priorityReduce)));
 
     // could allocate but told need to unreserve first
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -983,6 +1062,12 @@ public class TestReservations {
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     final int numNodes = 2;
     Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
@@ -1004,8 +1089,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1015,8 +1102,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1026,8 +1115,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1040,8 +1131,10 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -1153,6 +1246,12 @@ public class TestReservations {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
         8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
@@ -1178,8 +1277,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1189,8 +1290,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1200,8 +1303,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1214,8 +1319,10 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentReservation().getMemorySize());
@@ -1301,6 +1408,12 @@ public class TestReservations {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
         8 * GB);
 
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);
+
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
@@ -1330,8 +1443,10 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(2 * GB, a.getUsedResources().getMemorySize());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1342,8 +1457,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(5 * GB, a.getUsedResources().getMemorySize());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1354,8 +1471,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1370,8 +1489,10 @@ public class TestReservations {
     // used (8G) + required (5G). It will not reserved since it has to unreserve
     // some resource. Even with continous reservation looking, we don't allow 
     // unreserve resource to reserve container.
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(Resources.createResource(10 * GB)),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1386,8 +1507,10 @@ public class TestReservations {
     // try to assign reducer (5G on node 0), but tell it's resource limits <
     // used (8G) + required (5G). It will not reserved since it has to unreserve
     // some resource. Unfortunately, there's nothing to unreserve.
-    a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_2,
+            new ResourceLimits(Resources.createResource(10 * GB)),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(8 * GB, a.getUsedResources().getMemorySize());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1400,8 +1523,10 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // let it assign 5G to node_2
-    a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_2,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(13 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1413,8 +1538,10 @@ public class TestReservations {
     assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize());
 
     // reserve 8G node_0
-    a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(21 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1428,8 +1555,10 @@ public class TestReservations {
     // try to assign (8G on node 2). No room to allocate,
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
-    a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    TestUtils.applyResourceCommitRequest(clusterResource,
+        a.assignContainers(clusterResource, node_2,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps);
     assertEquals(21 * GB, a.getUsedResources().getMemorySize());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index a60b7ed..e34ee34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -430,4 +432,28 @@ public class TestUtils {
     req.setAllocationRequestId(allocationRequestId);
     return SchedulerRequestKey.create(req);
   }
+
+  public static void applyResourceCommitRequest(Resource clusterResource,
+      CSAssignment csAssignment,
+      final Map<NodeId, FiCaSchedulerNode> nodes,
+      final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
+      throws IOException {
+    CapacityScheduler cs = new CapacityScheduler() {
+      @Override
+      public FiCaSchedulerNode getNode(NodeId nodeId) {
+        return nodes.get(nodeId);
+      }
+
+      @Override
+      public FiCaSchedulerApp getApplicationAttempt(
+          ApplicationAttemptId applicationAttemptId) {
+        return apps.get(applicationAttemptId);
+      }
+    };
+
+    cs.setResourceCalculator(new DefaultResourceCalculator());
+
+    cs.submitResourceCommitRequest(clusterResource,
+        csAssignment);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
index a0bd951..1e61186 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -103,10 +103,6 @@ public class TestRMWebServicesSchedulerActivities
           verifyStateOfAllocations(allocations.getJSONObject(i),
               "finalAllocationState", "ALLOCATED");
           verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
-        } else {
-          verifyStateOfAllocations(allocations.getJSONObject(i),
-              "finalAllocationState", "SKIPPED");
-          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b");
         }
       }
     }
@@ -409,9 +405,9 @@ public class TestRMWebServicesSchedulerActivities
       verifyStateOfAllocations(allocations, "finalAllocationState",
           "ALLOCATED");
 
-      verifyNumberOfNodes(allocations, 6);
+      verifyNumberOfNodes(allocations, 5);
 
-      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1");
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-b-b1");
     }
     finally {
       rm.stop();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message