hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject hadoop git commit: YARN-3655. FairScheduler: potential livelock due to maxAMShare limitation and container reservation. (Zhihai Xu via kasha)
Date Sun, 07 Jun 2015 18:37:58 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk b61b48999 -> bd69ea408


YARN-3655. FairScheduler: potential livelock due to maxAMShare limitation and container reservation.
(Zhihai Xu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd69ea40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd69ea40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd69ea40

Branch: refs/heads/trunk
Commit: bd69ea408f8fdd8293836ce1089fe9b01616f2f7
Parents: b61b489
Author: Karthik Kambatla <kasha@apache.org>
Authored: Sun Jun 7 11:37:52 2015 -0700
Committer: Karthik Kambatla <kasha@apache.org>
Committed: Sun Jun 7 11:37:52 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FSAppAttempt.java            | 128 +++++----
 .../resourcemanager/scheduler/fair/FSQueue.java |  15 +
 .../scheduler/fair/FairScheduler.java           |  42 +--
 .../scheduler/fair/TestFairScheduler.java       | 282 +++++++++++++++++++
 5 files changed, 378 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd69ea40/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cc28977..d90433d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -498,6 +498,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3766. Fixed the apps table column error of generic history web UI.
     (Xuan Gong via zjshen)
 
+    YARN-3655. FairScheduler: potential livelock due to maxAMShare limitation
+    and container reservation. (Zhihai Xu via kasha)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd69ea40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 6287deb..7419446 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -541,39 +541,37 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
 
       return container.getResource();
-    } else {
-      if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
-        return Resources.none();
-      }
+    }
 
-      // The desired container won't fit here, so reserve
-      reserve(request.getPriority(), node, container, reserved);
+    // The desired container won't fit here, so reserve
+    reserve(request.getPriority(), node, container, reserved);
 
-      return FairScheduler.CONTAINER_RESERVED;
-    }
+    return FairScheduler.CONTAINER_RESERVED;
   }
 
   private boolean hasNodeOrRackLocalRequests(Priority priority) {
     return getResourceRequests(priority).size() > 1;
   }
 
-  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
-    }
-
+  /**
+   * Whether the AM container for this app is over maxAMShare limit.
+   */
+  private boolean isOverAMShareLimit() {
     // Check the AM resource usage for the leaf queue
     if (!isAmRunning() && !getUnmanagedAM()) {
       List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
       if (ask.isEmpty() || !getQueue().canRunAppAM(
           ask.get(0).getCapability())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping allocation because maxAMShare limit would " +
-              "be exceeded");
-        }
-        return Resources.none();
+        return true;
       }
     }
+    return false;
+  }
+
+  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
+    }
 
     Collection<Priority> prioritiesToTry = (reserved) ?
         Arrays.asList(node.getReservedContainer().getReservedPriority()) :
@@ -584,8 +582,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // (not scheduled) in order to promote better locality.
     synchronized (this) {
       for (Priority priority : prioritiesToTry) {
-        if (getTotalRequiredResources(priority) <= 0 ||
-            !hasContainerForNode(priority, node)) {
+        // Skip it for reserved container, since
+        // we already check it in isValidReservation.
+        if (!reserved && !hasContainerForNode(priority, node)) {
           continue;
         }
 
@@ -651,41 +650,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
-   * Called when this application already has an existing reservation on the
-   * given node.  Sees whether we can turn the reservation into an allocation.
-   * Also checks whether the application needs the reservation anymore, and
-   * releases it if not.
-   *
-   * @param node
-   *     Node that the application has an existing reservation on
-   */
-  public Resource assignReservedContainer(FSSchedulerNode node) {
-    RMContainer rmContainer = node.getReservedContainer();
-    Priority priority = rmContainer.getReservedPriority();
-
-    // Make sure the application still needs requests at this priority
-    if (getTotalRequiredResources(priority) == 0) {
-      unreserve(priority, node);
-      return Resources.none();
-    }
-
-    // Fail early if the reserved container won't fit.
-    // Note that we have an assumption here that there's only one container size
-    // per priority.
-    if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
-        node.getAvailableResource())) {
-      return Resources.none();
-    }
-
-    return assignContainer(node, true);
-  }
-
-
-  /**
    * Whether this app has containers requests that could be satisfied on the
    * given node, if the node had full space.
    */
-  public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
+  private boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
     ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY);
     ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());
     ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());
@@ -703,9 +671,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
                 (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
             // The requested container must be able to fit on the node:
             Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
-                anyRequest.getCapability(), node.getRMNode().getTotalCapability());
+                anyRequest.getCapability(),
+                node.getRMNode().getTotalCapability()) &&
+            // The requested container must fit in queue maximum share:
+            getQueue().fitsInMaxShare(anyRequest.getCapability());
   }
 
+  private boolean isValidReservation(FSSchedulerNode node) {
+    Priority reservedPriority = node.getReservedContainer().
+        getReservedPriority();
+    return hasContainerForNode(reservedPriority, node) &&
+        !isOverAMShareLimit();
+  }
+
+  /**
+   * Called when this application already has an existing reservation on the
+   * given node.  Sees whether we can turn the reservation into an allocation.
+   * Also checks whether the application needs the reservation anymore, and
+   * releases it if not.
+   *
+   * @param node
+   *     Node that the application has an existing reservation on
+   * @return whether the reservation on the given node is valid.
+   */
+  public boolean assignReservedContainer(FSSchedulerNode node) {
+    RMContainer rmContainer = node.getReservedContainer();
+    Priority reservedPriority = rmContainer.getReservedPriority();
+
+    if (!isValidReservation(node)) {
+      // Don't hold the reservation if app can no longer use it
+      LOG.info("Releasing reservation that cannot be satisfied for " +
+          "application " + getApplicationAttemptId() + " on node " + node);
+      unreserve(reservedPriority, node);
+      return false;
+    }
+
+    // Reservation valid; try to fulfill the reservation
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to fulfill reservation for application "
+          + getApplicationAttemptId() + " on node: " + node);
+    }
+
+    // Fail early if the reserved container won't fit.
+    // Note that we have an assumption here that
+    // there's only one container size per priority.
+    if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
+        node.getAvailableResource())) {
+      assignContainer(node, true);
+    }
+    return true;
+  }
 
   static class RMContainerComparator implements Comparator<RMContainer>,
       Serializable {
@@ -795,6 +810,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
   @Override
   public Resource assignContainer(FSSchedulerNode node) {
+    if (isOverAMShareLimit()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping allocation because maxAMShare limit would " +
+            "be exceeded");
+      }
+      return Resources.none();
+    }
     return assignContainer(node, false);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd69ea40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index ade2880..e488c76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -330,4 +330,19 @@ public abstract class FSQueue implements Queue, Schedulable {
   @Override
   public void decPendingResource(String nodeLabel, Resource resourceToDec) {
   }
+
+  public boolean fitsInMaxShare(Resource additionalResource) {
+    Resource usagePlusAddition =
+        Resources.add(getResourceUsage(), additionalResource);
+
+    if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
+      return false;
+    }
+
+    FSQueue parentQueue = getParent();
+    if (parentQueue != null) {
+      return parentQueue.fitsInMaxShare(additionalResource);
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd69ea40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 64b3f12..2ed3b2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -1075,31 +1074,12 @@ public class FairScheduler extends
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
+    boolean validReservation = false;
     FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
-      Priority reservedPriority = node.getReservedContainer().getReservedPriority();
-      FSQueue queue = reservedAppSchedulable.getQueue();
-
-      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
-          || !fitsInMaxShare(queue,
-          node.getReservedContainer().getReservedResource())) {
-        // Don't hold the reservation if app can no longer use it
-        LOG.info("Releasing reservation that cannot be satisfied for application "
-            + reservedAppSchedulable.getApplicationAttemptId()
-            + " on node " + node);
-        reservedAppSchedulable.unreserve(reservedPriority, node);
-        reservedAppSchedulable = null;
-      } else {
-        // Reservation exists; try to fulfill the reservation
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to fulfill reservation for application "
-              + reservedAppSchedulable.getApplicationAttemptId()
-              + " on node: " + node);
-        }
-        node.getReservedAppSchedulable().assignReservedContainer(node);
-      }
+      validReservation = reservedAppSchedulable.assignReservedContainer(node);
     }
-    if (reservedAppSchedulable == null) {
+    if (!validReservation) {
       // No reservation, schedule at queue which is farthest below fair share
       int assignedContainers = 0;
       while (node.getReservedContainer() == null) {
@@ -1117,22 +1097,6 @@ public class FairScheduler extends
     updateRootQueueMetrics();
   }
 
-  static boolean fitsInMaxShare(FSQueue queue, Resource
-      additionalResource) {
-    Resource usagePlusAddition =
-        Resources.add(queue.getResourceUsage(), additionalResource);
-
-    if (!Resources.fitsIn(usagePlusAddition, queue.getMaxShare())) {
-      return false;
-    }
-    
-    FSQueue parentQueue = queue.getParent();
-    if (parentQueue != null) {
-      return fitsInMaxShare(parentQueue, additionalResource);
-    }
-    return true;
-  }
-
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd69ea40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 94fdc1a..56e8adc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -3701,6 +3701,288 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         0, queue2.getAmResourceUsage().getMemory());
   }
 
+  /**
+   * The test verifies container gets reserved when not over maxAMShare,
+   * reserved container gets unreserved when over maxAMShare,
+   * container doesn't get reserved when over maxAMShare,
+   * reserved container is turned into an allocation and
+   * superfluously reserved container gets unreserved.
+   * 1. create three nodes: Node1 is 10G, Node2 is 10G and Node3 is 5G.
+   * 2. APP1 allocated 1G on Node1 and APP2 allocated 1G on Node2.
+   * 3. APP3 reserved 10G on Node1 and Node2.
+   * 4. APP4 allocated 5G on Node3, which makes APP3 over maxAMShare.
+   * 5. Remove APP1 to make Node1 have 10G available resource.
+   * 6. APP3 unreserved its container on Node1 because it is over maxAMShare.
+   * 7. APP5 allocated 1G on Node1 after APP3 unreserved its container.
+   * 8. Remove APP3.
+   * 9. APP6 failed to reserve a 10G container on Node1 due to AMShare limit.
+   * 10. APP7 allocated 1G on Node1.
+   * 11. Remove APP4 and APP5.
+   * 12. APP6 reserved 10G on Node1 and Node2.
+   * 13. APP8 failed to allocate a 1G container on Node1 and Node2 because
+   *     APP6 reserved Node1 and Node2.
+   * 14. Remove APP2.
+   * 15. APP6 turned the 10G reservation into an allocation on node2.
+   * 16. APP6 unreserved its container on node1, APP8 allocated 1G on Node1.
+   */
+  @Test
+  public void testQueueMaxAMShareWithContainerReservation() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("<maxAMShare>0.5</maxAMShare>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(10240, 10),
+            1, "127.0.0.1");
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(10240, 10),
+            2, "127.0.0.2");
+    RMNode node3 =
+        MockNodes.newNodeInfo(1, Resources.createResource(5120, 5),
+            3, "127.0.0.3");
+    NodeAddedSchedulerEvent nodeE1 = new NodeAddedSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1);
+    NodeAddedSchedulerEvent nodeE2 = new NodeAddedSchedulerEvent(node2);
+    NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2);
+    NodeAddedSchedulerEvent nodeE3 = new NodeAddedSchedulerEvent(node3);
+    NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3);
+    scheduler.handle(nodeE1);
+    scheduler.handle(nodeE2);
+    scheduler.handle(nodeE3);
+    scheduler.update();
+    FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1",
+        true);
+    Resource amResource1 = Resource.newInstance(1024, 1);
+    Resource amResource2 = Resource.newInstance(1024, 1);
+    Resource amResource3 = Resource.newInstance(10240, 1);
+    Resource amResource4 = Resource.newInstance(5120, 1);
+    Resource amResource5 = Resource.newInstance(1024, 1);
+    Resource amResource6 = Resource.newInstance(10240, 1);
+    Resource amResource7 = Resource.newInstance(1024, 1);
+    Resource amResource8 = Resource.newInstance(1024, 1);
+    int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    scheduler.update();
+    // Allocate app1's AM container on node1.
+    scheduler.handle(updateE1);
+    assertEquals("Application1's AM requests 1024 MB memory",
+        1024, app1.getAMResource().getMemory());
+    assertEquals("Application1's AM should be running",
+        1, app1.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 1024 MB memory",
+        1024, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+    createApplicationWithAMResource(attId2, "queue1", "user1", amResource2);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+    scheduler.update();
+    // Allocate app2's AM container on node2.
+    scheduler.handle(updateE2);
+    assertEquals("Application2's AM requests 1024 MB memory",
+        1024, app2.getAMResource().getMemory());
+    assertEquals("Application2's AM should be running",
+        1, app2.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
+    createApplicationWithAMResource(attId3, "queue1", "user1", amResource3);
+    createSchedulingRequestExistingApplication(10240, 1, amPriority, attId3);
+    FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
+    scheduler.update();
+    // app3 reserves a container on node1 because node1's available resource
+    // is less than app3's AM container resource.
+    scheduler.handle(updateE1);
+    // Similarly app3 reserves a container on node2.
+    scheduler.handle(updateE2);
+    assertEquals("Application3's AM resource shouldn't be updated",
+        0, app3.getAMResource().getMemory());
+    assertEquals("Application3's AM should not be running",
+        0, app3.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
+    createApplicationWithAMResource(attId4, "queue1", "user1", amResource4);
+    createSchedulingRequestExistingApplication(5120, 1, amPriority, attId4);
+    FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
+    scheduler.update();
+    // app4 can't allocate its AM container on node1 because
+    // app3 already reserved its container on node1.
+    scheduler.handle(updateE1);
+    assertEquals("Application4's AM resource shouldn't be updated",
+        0, app4.getAMResource().getMemory());
+    assertEquals("Application4's AM should not be running",
+        0, app4.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    scheduler.update();
+    // Allocate app4's AM container on node3.
+    scheduler.handle(updateE3);
+    assertEquals("Application4's AM requests 5120 MB memory",
+        5120, app4.getAMResource().getMemory());
+    assertEquals("Application4's AM should be running",
+        1, app4.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 7168 MB memory",
+        7168, queue1.getAmResourceUsage().getMemory());
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+        new AppAttemptRemovedSchedulerEvent(attId1,
+            RMAppAttemptState.FINISHED, false);
+    // Release app1's AM container on node1.
+    scheduler.handle(appRemovedEvent1);
+    assertEquals("Queue1's AM resource usage should be 6144 MB memory",
+        6144, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId5 = createAppAttemptId(5, 1);
+    createApplicationWithAMResource(attId5, "queue1", "user1", amResource5);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId5);
+    FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
+    scheduler.update();
+    // app5 can allocate its AM container on node1 after
+    // app3 unreserve its container on node1 due to
+    // exceeding queue MaxAMShare limit.
+    scheduler.handle(updateE1);
+    assertEquals("Application5's AM requests 1024 MB memory",
+        1024, app5.getAMResource().getMemory());
+    assertEquals("Application5's AM should be running",
+        1, app5.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 7168 MB memory",
+        7168, queue1.getAmResourceUsage().getMemory());
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
+        new AppAttemptRemovedSchedulerEvent(attId3,
+            RMAppAttemptState.FINISHED, false);
+    // Remove app3.
+    scheduler.handle(appRemovedEvent3);
+    assertEquals("Queue1's AM resource usage should be 7168 MB memory",
+        7168, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId6 = createAppAttemptId(6, 1);
+    createApplicationWithAMResource(attId6, "queue1", "user1", amResource6);
+    createSchedulingRequestExistingApplication(10240, 1, amPriority, attId6);
+    FSAppAttempt app6 = scheduler.getSchedulerApp(attId6);
+    scheduler.update();
+    // app6 can't reserve a container on node1 because
+    // it exceeds queue MaxAMShare limit.
+    scheduler.handle(updateE1);
+    assertEquals("Application6's AM resource shouldn't be updated",
+        0, app6.getAMResource().getMemory());
+    assertEquals("Application6's AM should not be running",
+        0, app6.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 7168 MB memory",
+        7168, queue1.getAmResourceUsage().getMemory());
+
+    ApplicationAttemptId attId7 = createAppAttemptId(7, 1);
+    createApplicationWithAMResource(attId7, "queue1", "user1", amResource7);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId7);
+    FSAppAttempt app7 = scheduler.getSchedulerApp(attId7);
+    scheduler.update();
+    // Allocate app7's AM container on node1 to prove
+    // app6 didn't reserve a container on node1.
+    scheduler.handle(updateE1);
+    assertEquals("Application7's AM requests 1024 MB memory",
+        1024, app7.getAMResource().getMemory());
+    assertEquals("Application7's AM should be running",
+        1, app7.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 8192 MB memory",
+        8192, queue1.getAmResourceUsage().getMemory());
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent4 =
+        new AppAttemptRemovedSchedulerEvent(attId4,
+            RMAppAttemptState.FINISHED, false);
+    // Release app4's AM container on node3.
+    scheduler.handle(appRemovedEvent4);
+    assertEquals("Queue1's AM resource usage should be 3072 MB memory",
+        3072, queue1.getAmResourceUsage().getMemory());
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent5 =
+        new AppAttemptRemovedSchedulerEvent(attId5,
+            RMAppAttemptState.FINISHED, false);
+    // Release app5's AM container on node1.
+    scheduler.handle(appRemovedEvent5);
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+              2048, queue1.getAmResourceUsage().getMemory());
+
+    scheduler.update();
+    // app6 reserves a container on node1 because node1's available resource
+    // is less than app6's AM container resource and
+    // app6 is not over AMShare limit.
+    scheduler.handle(updateE1);
+    // Similarly app6 reserves a container on node2.
+    scheduler.handle(updateE2);
+
+    ApplicationAttemptId attId8 = createAppAttemptId(8, 1);
+    createApplicationWithAMResource(attId8, "queue1", "user1", amResource8);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId8);
+    FSAppAttempt app8 = scheduler.getSchedulerApp(attId8);
+    scheduler.update();
+    // app8 can't allocate a container on node1 because
+    // app6 already reserved a container on node1.
+    scheduler.handle(updateE1);
+    assertEquals("Application8's AM resource shouldn't be updated",
+        0, app8.getAMResource().getMemory());
+    assertEquals("Application8's AM should not be running",
+        0, app8.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+    scheduler.update();
+    // app8 can't allocate a container on node2 because
+    // app6 already reserved a container on node2.
+    scheduler.handle(updateE2);
+    assertEquals("Application8's AM resource shouldn't be updated",
+        0, app8.getAMResource().getMemory());
+    assertEquals("Application8's AM should not be running",
+        0, app8.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
+        new AppAttemptRemovedSchedulerEvent(attId2,
+            RMAppAttemptState.FINISHED, false);
+    // Release app2's AM container on node2.
+    scheduler.handle(appRemovedEvent2);
+    assertEquals("Queue1's AM resource usage should be 1024 MB memory",
+        1024, queue1.getAmResourceUsage().getMemory());
+
+    scheduler.update();
+    // app6 turns the reservation into an allocation on node2.
+    scheduler.handle(updateE2);
+    assertEquals("Application6's AM requests 10240 MB memory",
+        10240, app6.getAMResource().getMemory());
+    assertEquals("Application6's AM should be running",
+        1, app6.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 11264 MB memory",
+        11264, queue1.getAmResourceUsage().getMemory());
+
+    scheduler.update();
+    // app6 unreserve its container on node1 because
+    // it already got a container on node2.
+    // Now app8 can allocate its AM container on node1.
+    scheduler.handle(updateE1);
+    assertEquals("Application8's AM requests 1024 MB memory",
+        1024, app8.getAMResource().getMemory());
+    assertEquals("Application8's AM should be running",
+        1, app8.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 12288 MB memory",
+        12288, queue1.getAmResourceUsage().getMemory());
+  }
+
   @Test
   public void testMaxRunningAppsHierarchicalQueues() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);


Mime
View raw message