hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [03/15] git commit: YARN-1769. CapacityScheduler: Improve reservations. Contributed by Thomas Graves
Date Mon, 29 Sep 2014 21:02:12 GMT
YARN-1769. CapacityScheduler: Improve reservations. Contributed by Thomas Graves


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c220651
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c220651
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c220651

Branch: refs/heads/HDFS-6581
Commit: 9c22065109a77681bc2534063eabe8692fbcb3cd
Parents: b38e52b
Author: Jason Lowe <jlowe@apache.org>
Authored: Mon Sep 29 14:12:18 2014 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Mon Sep 29 14:12:18 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |    3 +
 .../dev-support/findbugs-exclude.xml            |   12 +
 .../scheduler/capacity/CSQueue.java             |    7 +-
 .../scheduler/capacity/CapacityScheduler.java   |   15 +-
 .../CapacitySchedulerConfiguration.java         |   18 +
 .../capacity/CapacitySchedulerContext.java      |    6 +-
 .../scheduler/capacity/LeafQueue.java           |  349 +++++-
 .../scheduler/capacity/ParentQueue.java         |   99 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |   27 +
 .../capacity/TestApplicationLimits.java         |    8 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   32 +-
 .../scheduler/capacity/TestLeafQueue.java       |  148 +--
 .../scheduler/capacity/TestParentQueue.java     |   81 +-
 .../scheduler/capacity/TestReservations.java    | 1184 ++++++++++++++++++
 14 files changed, 1760 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0a6500b..7fa83be 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -265,6 +265,9 @@ Release 2.6.0 - UNRELEASED
     YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier
     to use protobuf object as the payload. (Junping Du via jianhe)
 
+    YARN-1769. CapacityScheduler: Improve reservations (Thomas Graves via
+    jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index b1dfb1e..0e6207b 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -351,4 +351,16 @@
     <Class name="org.apache.hadoop.yarn.util.ApplicationClassLoader"/>
     <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
   </Match>
+
+   <!-- It is only changed on re-initialization the warnings are for access from a test function. -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue" />
+    <Field name="reservationsContinueLooking" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue" />
+    <Field name="reservationsContinueLooking" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 04c2fd5..db893dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -184,10 +184,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
+   * @param needToUnreserve assign container only if it can unreserve one first
    * @return the assignment
    */
   public CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node);
+      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
   
   /**
    * A container assigned to the queue has completed.
@@ -200,11 +201,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    *                        container
    * @param childQueue <code>CSQueue</code> to reinsert in childQueues 
    * @param event event to be sent to the container
+   * @param sortQueues indicates whether it should re-sort the queues
    */
   public void completedContainer(Resource clusterResource,
       FiCaSchedulerApp application, FiCaSchedulerNode node, 
       RMContainer container, ContainerStatus containerStatus, 
-      RMContainerEventType event, CSQueue childQueue);
+      RMContainerEventType event, CSQueue childQueue,
+      boolean sortQueues);
 
   /**
    * Get the number of applications in the queue.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bdfc819..d847579 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -516,13 +516,13 @@ public class CapacityScheduler extends
             "Queue configuration missing child queue names for " + queueName);
       }
       queue = 
-          new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
+          new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName));
       
       // Used only for unit tests
       queue = hook.hook(queue);
     } else {
       ParentQueue parentQueue = 
-        new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
+        new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
 
       // Used only for unit tests
       queue = hook.hook(parentQueue);
@@ -922,7 +922,8 @@ public class CapacityScheduler extends
           node.getNodeID());
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      CSAssignment assignment = queue.assignContainers(clusterResource, node);
+      CSAssignment assignment = queue.assignContainers(clusterResource, node,
+          false);
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -933,7 +934,7 @@ public class CapacityScheduler extends
           SchedulerUtils.createAbnormalContainerStatus(
               container.getId(), 
               SchedulerUtils.UNRESERVED_CONTAINER), 
-          RMContainerEventType.RELEASED, null);
+          RMContainerEventType.RELEASED, null, true);
       }
 
     }
@@ -946,7 +947,7 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node);
+        root.assignContainers(clusterResource, node, false);
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1122,7 +1123,7 @@ public class CapacityScheduler extends
     // Inform the queue
     LeafQueue queue = (LeafQueue)application.getQueue();
     queue.completedContainer(clusterResource, application, node, 
-        rmContainer, containerStatus, event, null);
+        rmContainer, containerStatus, event, null, true);
 
     LOG.info("Application attempt " + application.getApplicationAttemptId()
         + " released container " + container.getId() + " on node: " + node
@@ -1138,7 +1139,7 @@ public class CapacityScheduler extends
   }
   
   @Lock(Lock.NoLock.class)
-  FiCaSchedulerNode getNode(NodeId nodeId) {
+  public FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index af6bdc3..5542ef3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -81,6 +81,13 @@ public class CapacitySchedulerConfiguration extends Configuration {
 
   @Private
   public static final String STATE = "state";
+  
+  @Private
+  public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+      + "reservations-continue-look-all-nodes";
+  
+  @Private
+  public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
 
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@@ -308,6 +315,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
         QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
   }
   
+  /*
+   * Returns whether we should continue to look at all heart beating nodes even
+   * after the reservation limit was hit. The node heart beating in could
+   * satisfy the request thus could be a better pick then waiting for the
+   * reservation to be fullfilled.  This config is refreshable.
+   */
+  public boolean getReservationContinueLook() {
+    return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
+        DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
+  }
+  
   private static String getAclKey(QueueACL acl) {
     return "acl_" + acl.toString().toLowerCase();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index a3dbc35..03a1cb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.util.Comparator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 /**
@@ -55,4 +57,6 @@ public interface CapacitySchedulerContext {
   ResourceCalculator getResourceCalculator();
 
   Comparator<CSQueue> getQueueComparator();
+  
+  FiCaSchedulerNode getNode(NodeId nodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c93c5f..cdb6553 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -129,6 +130,8 @@ public class LeafQueue implements CSQueue {
   
   private final ResourceCalculator resourceCalculator;
   
+  private boolean reservationsContinueLooking;
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) {
     this.scheduler = cs;
@@ -202,8 +205,9 @@ public class LeafQueue implements CSQueue {
         maximumCapacity, absoluteMaxCapacity, 
         userLimit, userLimitFactor, 
         maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
-        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
-            .getConfiguration().getNodeLocalityDelay());
+        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, 
+        cs.getConfiguration().getNodeLocalityDelay(), 
+        cs.getConfiguration().getReservationContinueLook());
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("LeafQueue:" + " name=" + queueName
@@ -225,7 +229,8 @@ public class LeafQueue implements CSQueue {
       int maxApplications, float maxAMResourcePerQueuePercent,
       int maxApplicationsPerUser, int maxActiveApplications,
       int maxActiveApplicationsPerUser, QueueState state,
-      Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay)
+      Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
+      boolean continueLooking)
   {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -257,6 +262,7 @@ public class LeafQueue implements CSQueue {
     this.queueInfo.setQueueState(this.state);
     
     this.nodeLocalityDelay = nodeLocalityDelay;
+    this.reservationsContinueLooking = continueLooking;
 
     StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
@@ -321,7 +327,9 @@ public class LeafQueue implements CSQueue {
         " [= configuredState ]" + "\n" +
         "acls = " + aclsString +
         " [= configuredAcls ]" + "\n" + 
-        "nodeLocalityDelay = " +  nodeLocalityDelay + "\n");
+        "nodeLocalityDelay = " +  nodeLocalityDelay + "\n" +
+        "reservationsContinueLooking = " +
+        reservationsContinueLooking + "\n");
   }
   
   @Override
@@ -555,6 +563,11 @@ public class LeafQueue implements CSQueue {
     return nodeLocalityDelay;
   }
   
+  @Private
+  boolean getReservationContinueLooking() {
+    return reservationsContinueLooking;
+  }
+  
   public String toString() {
     return queueName + ": " + 
         "capacity=" + capacity + ", " + 
@@ -613,7 +626,8 @@ public class LeafQueue implements CSQueue {
         newlyParsedLeafQueue.getMaximumActiveApplications(), 
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
-        newlyParsedLeafQueue.getNodeLocalityDelay());
+        newlyParsedLeafQueue.getNodeLocalityDelay(),
+        newlyParsedLeafQueue.reservationsContinueLooking);
 
     // queue metrics are updated, more resource may be available
     // activate the pending applications if possible
@@ -802,8 +816,8 @@ public class LeafQueue implements CSQueue {
   private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
   
   @Override
-  public synchronized CSAssignment 
-  assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
+  public synchronized CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, boolean needToUnreserve) {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -848,9 +862,17 @@ public class LeafQueue implements CSQueue {
           Resource required = anyRequest.getCapability();
 
           // Do we need containers at this 'priority'?
-          if (!needContainers(application, priority, required)) {
+          if (application.getTotalRequiredResources(priority) <= 0) {
             continue;
           }
+          if (!this.reservationsContinueLooking) {
+            if (!needContainers(application, priority, required)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("doesn't need containers based on reservation algo!");
+              }
+              continue;
+            }
+          }
 
           // Compute user-limit & set headroom
           // Note: We compute both user-limit & headroom with the highest 
@@ -862,14 +884,14 @@ public class LeafQueue implements CSQueue {
                   required);          
           
           // Check queue max-capacity limit
-          if (!assignToQueue(clusterResource, required)) {
+          if (!assignToQueue(clusterResource, required, application, true)) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
-          if (!assignToUser(
-              clusterResource, application.getUser(), userLimit)) {
-            break; 
+          if (!assignToUser(clusterResource, application.getUser(), userLimit,
+              application, true)) {
+            break;
           }
 
           // Inform the application it is about to get a scheduling opportunity
@@ -878,7 +900,7 @@ public class LeafQueue implements CSQueue {
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null);
+                null, needToUnreserve);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -900,6 +922,9 @@ public class LeafQueue implements CSQueue {
             // otherwise the app will be delayed for each non-local assignment.
             // This helps apps with many off-cluster requests schedule faster.
             if (assignment.getType() != NodeType.OFF_SWITCH) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Resetting scheduling opportunities");
+              }
               application.resetSchedulingOpportunities(priority);
             }
             
@@ -935,22 +960,57 @@ public class LeafQueue implements CSQueue {
 
     // Try to assign if we have sufficient resources
     assignContainersOnNode(clusterResource, node, application, priority, 
-        rmContainer);
+        rmContainer, false);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
   }
 
-  private synchronized boolean assignToQueue(Resource clusterResource, 
-      Resource required) {
+
+  @Private
+  protected synchronized boolean assignToQueue(Resource clusterResource, 
+      Resource required, FiCaSchedulerApp application, 
+      boolean checkReservations) {
+
+    Resource potentialTotalResource = Resources.add(usedResources, required);
     // Check how of the cluster's absolute capacity we are currently using...
-    float potentialNewCapacity = 
-        Resources.divide(
-            resourceCalculator, clusterResource, 
-            Resources.add(usedResources, required), 
-            clusterResource);
+    float potentialNewCapacity = Resources.divide(resourceCalculator,
+        clusterResource, potentialTotalResource, clusterResource);
     if (potentialNewCapacity > absoluteMaxCapacity) {
+      // if enabled, check to see if could we potentially use this node instead
+      // of a reserved node if the application has reserved containers
+      if (this.reservationsContinueLooking && checkReservations) {
+
+        float potentialNewWithoutReservedCapacity = Resources.divide(
+            resourceCalculator,
+            clusterResource,
+            Resources.subtract(potentialTotalResource,
+                application.getCurrentReservation()),
+             clusterResource);
+
+        if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("try to use reserved: "
+                + getQueueName()
+                + " usedResources: "
+                + usedResources
+                + " clusterResources: "
+                + clusterResource
+                + " reservedResources: "
+                + application.getCurrentReservation()
+                + " currentCapacity "
+                + Resources.divide(resourceCalculator, clusterResource,
+                    usedResources, clusterResource) + " required " + required
+                + " potentialNewWithoutReservedCapacity: "
+                + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
+                + absoluteMaxCapacity + ")");
+          }
+          // we could potentially use this node instead of reserved node
+          return true;
+        }
+
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueueName()
             + " usedResources: " + usedResources
@@ -966,6 +1026,8 @@ public class LeafQueue implements CSQueue {
     return true;
   }
 
+
+
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   private Resource computeUserLimitAndSetHeadroom(
       FiCaSchedulerApp application, Resource clusterResource, Resource required) {
@@ -1085,25 +1147,43 @@ public class LeafQueue implements CSQueue {
     return limit;
   }
   
-  private synchronized boolean assignToUser(Resource clusterResource,
-      String userName, Resource limit) {
+  @Private
+  protected synchronized boolean assignToUser(Resource clusterResource,
+      String userName, Resource limit, FiCaSchedulerApp application,
+      boolean checkReservations) {
 
     User user = getUser(userName);
-    
+
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, but it's a > check, not a >= check, so...
-    if (Resources.greaterThan(resourceCalculator, clusterResource, 
-            user.getConsumedResources(), limit)) {
+    if (Resources.greaterThan(resourceCalculator, clusterResource,
+        user.getConsumedResources(), limit)) {
+
+      // if enabled, check to see if could we potentially use this node instead
+      // of a reserved node if the application has reserved containers
+      if (this.reservationsContinueLooking && checkReservations) {
+        if (Resources.lessThanOrEqual(
+            resourceCalculator,
+            clusterResource,
+            Resources.subtract(user.getConsumedResources(),
+                application.getCurrentReservation()), limit)) {
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("User " + userName + " in queue " + getQueueName()
+                + " will exceed limit based on reservations - " + " consumed: "
+                + user.getConsumedResources() + " reserved: "
+                + application.getCurrentReservation() + " limit: " + limit);
+          }
+          return true;
+        }
+      }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("User " + userName + " in queue " + getQueueName() + 
-            " will exceed limit - " +  
-            " consumed: " + user.getConsumedResources() + 
-            " limit: " + limit
-        );
+        LOG.debug("User " + userName + " in queue " + getQueueName()
+            + " will exceed limit - " + " consumed: "
+            + user.getConsumedResources() + " limit: " + limit);
       }
       return false;
     }
-
     return true;
   }
 
@@ -1139,7 +1219,7 @@ public class LeafQueue implements CSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource, 
       FiCaSchedulerNode node, FiCaSchedulerApp application, 
-      Priority priority, RMContainer reservedContainer) {
+      Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
 
     Resource assigned = Resources.none();
 
@@ -1149,7 +1229,7 @@ public class LeafQueue implements CSQueue {
     if (nodeLocalResourceRequest != null) {
       assigned = 
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
-              node, application, priority, reservedContainer); 
+              node, application, priority, reservedContainer, needToUnreserve); 
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
         return new CSAssignment(assigned, NodeType.NODE_LOCAL);
@@ -1166,7 +1246,7 @@ public class LeafQueue implements CSQueue {
       
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
-              node, application, priority, reservedContainer);
+              node, application, priority, reservedContainer, needToUnreserve);
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
         return new CSAssignment(assigned, NodeType.RACK_LOCAL);
@@ -1183,21 +1263,99 @@ public class LeafQueue implements CSQueue {
 
       return new CSAssignment(
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-              node, application, priority, reservedContainer), 
+              node, application, priority, reservedContainer, needToUnreserve), 
               NodeType.OFF_SWITCH);
     }
     
     return SKIP_ASSIGNMENT;
   }
 
-  private Resource assignNodeLocalContainers(
-      Resource clusterResource, ResourceRequest nodeLocalResourceRequest, 
-      FiCaSchedulerNode node, FiCaSchedulerApp application, 
-      Priority priority, RMContainer reservedContainer) {
+  @Private
+  protected boolean findNodeToUnreserve(Resource clusterResource,
+      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+      Resource capability) {
+    // need to unreserve some other container first
+    NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+    if (idToUnreserve == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("checked to see if could unreserve for app but nothing "
+            + "reserved that matches for this app");
+      }
+      return false;
+    }
+    FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve);
+    if (nodeToUnreserve == null) {
+      LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
+      return false;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("unreserving for app: " + application.getApplicationId()
+        + " on nodeId: " + idToUnreserve
+        + " in order to replace reserved application and place it on node: "
+        + node.getNodeID() + " needing: " + capability);
+    }
+
+    // headroom
+    Resources.addTo(application.getHeadroom(), nodeToUnreserve
+        .getReservedContainer().getReservedResource());
+
+    // Make sure to not have completedContainers sort the queues here since
+    // we are already inside an iterator loop for the queues and this would
+    // cause an concurrent modification exception.
+    completedContainer(clusterResource, application, nodeToUnreserve,
+        nodeToUnreserve.getReservedContainer(),
+        SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve
+            .getReservedContainer().getContainerId(),
+            SchedulerUtils.UNRESERVED_CONTAINER),
+        RMContainerEventType.RELEASED, null, false);
+    return true;
+  }
+
+  @Private
+  protected boolean checkLimitsToReserve(Resource clusterResource,
+      FiCaSchedulerApp application, Resource capability,
+      boolean needToUnreserve) {
+    if (needToUnreserve) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("we needed to unreserve to be able to allocate");
+      }
+      return false;
+    }
+
+    // we can't reserve if we got here based on the limit
+    // checks assuming we could unreserve!!!
+    Resource userLimit = computeUserLimitAndSetHeadroom(application,
+        clusterResource, capability);
+
+    // Check queue max-capacity limit
+    if (!assignToQueue(clusterResource, capability, application, false)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("was going to reserve but hit queue limit");
+      }
+      return false;
+    }
+
+    // Check user limit
+    if (!assignToUser(clusterResource, application.getUser(), userLimit,
+        application, false)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("was going to reserve but hit user limit");
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  private Resource assignNodeLocalContainers(Resource clusterResource,
+      ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority, 
-          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
+      return assignContainer(clusterResource, node, application, priority,
+          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
+          needToUnreserve);
     }
     
     return Resources.none();
@@ -1206,11 +1364,12 @@ public class LeafQueue implements CSQueue {
   private Resource assignRackLocalContainers(
       Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer) {
+      RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
         reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority, 
-          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
+      return assignContainer(clusterResource, node, application, priority,
+          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
+          needToUnreserve);
     }
     
     return Resources.none();
@@ -1219,11 +1378,12 @@ public class LeafQueue implements CSQueue {
   private Resource assignOffSwitchContainers(
       Resource clusterResource, ResourceRequest offSwitchResourceRequest,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
-      RMContainer reservedContainer) {
+      RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
         reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority, 
-          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
+      return assignContainer(clusterResource, node, application, priority,
+          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
+          needToUnreserve);
     }
     
     return Resources.none();
@@ -1303,14 +1463,17 @@ public class LeafQueue implements CSQueue {
     return container;
   }
 
+
   private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
       FiCaSchedulerApp application, Priority priority, 
-      ResourceRequest request, NodeType type, RMContainer rmContainer) {
+      ResourceRequest request, NodeType type, RMContainer rmContainer,
+      boolean needToUnreserve) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
         + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type);
+        + " request=" + request + " type=" + type
+        + " needToUnreserve= " + needToUnreserve);
     }
     Resource capability = request.getCapability();
     Resource available = node.getAvailableResource();
@@ -1335,6 +1498,18 @@ public class LeafQueue implements CSQueue {
       return Resources.none();
     }
 
+    // default to true since if reservation continue look feature isn't on
+    // needContainers is checked earlier and we wouldn't have gotten this far
+    boolean canAllocContainer = true;
+    if (this.reservationsContinueLooking) {
+      // based on reservations can we allocate/reserve more or do we need
+      // to unreserve one first
+      canAllocContainer = needContainers(application, priority, capability);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("can alloc container is: " + canAllocContainer);
+      }
+    }
+
     // Can we allocate a container on this node?
     int availableContainers = 
         resourceCalculator.computeAvailableContainers(available, capability);
@@ -1342,8 +1517,28 @@ public class LeafQueue implements CSQueue {
       // Allocate...
 
       // Did we previously reserve containers at this 'priority'?
-      if (rmContainer != null){
+      if (rmContainer != null) {
         unreserve(application, priority, node, rmContainer);
+      } else if (this.reservationsContinueLooking
+          && (!canAllocContainer || needToUnreserve)) {
+        // need to unreserve some other container first
+        boolean res = findNodeToUnreserve(clusterResource, node, application,
+            priority, capability);
+        if (!res) {
+          return Resources.none();
+        }
+      } else {
+        // we got here by possibly ignoring queue capacity limits. If the
+        // parameter needToUnreserve is true it means we ignored one of those
+        // limits in the chance we could unreserve. If we are here we aren't
+        // trying to unreserve so we can't allocate anymore due to that parent
+        // limit.
+        if (needToUnreserve) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("we needed to unreserve to be able to allocate, skipping");
+          }
+          return Resources.none();
+        }
       }
 
       // Inform the application
@@ -1366,17 +1561,38 @@ public class LeafQueue implements CSQueue {
 
       return container.getResource();
     } else {
-      // Reserve by 'charging' in advance...
-      reserve(application, priority, node, rmContainer, container);
+      // if we are allowed to allocate but this node doesn't have space, reserve it or
+      // if this was an already a reserved container, reserve it again
+      if ((canAllocContainer) || (rmContainer != null)) {
+
+        if (reservationsContinueLooking) {
+          // we got here by possibly ignoring parent queue capacity limits. If
+          // the parameter needToUnreserve is true it means we ignored one of
+          // those limits in the chance we could unreserve. If we are here
+          // we aren't trying to unreserve so we can't allocate
+          // anymore due to that parent limit
+          boolean res = checkLimitsToReserve(clusterResource, application, capability, 
+              needToUnreserve);
+          if (!res) {
+            return Resources.none();
+          }
+        }
 
-      LOG.info("Reserved container " + 
-          " application attempt=" + application.getApplicationAttemptId() +
-          " resource=" + request.getCapability() + 
-          " queue=" + this.toString() + 
-          " node=" + node +
-          " clusterResource=" + clusterResource);
+        // Reserve by 'charging' in advance...
+        reserve(application, priority, node, rmContainer, container);
+
+        LOG.info("Reserved container " + 
+            " application=" + application.getApplicationId() + 
+            " resource=" + request.getCapability() + 
+            " queue=" + this.toString() + 
+            " usedCapacity=" + getUsedCapacity() + 
+            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + 
+            " used=" + usedResources +
+            " cluster=" + clusterResource);
 
-      return request.getCapability();
+        return request.getCapability();
+      }
+      return Resources.none();
     }
   }
 
@@ -1402,8 +1618,8 @@ public class LeafQueue implements CSQueue {
       node.unreserveResource(application);
 
       // Update reserved metrics
-      getMetrics().unreserveResource(
-          application.getUser(), rmContainer.getContainer().getResource());
+      getMetrics().unreserveResource(application.getUser(),
+          rmContainer.getContainer().getResource());
       return true;
     }
     return false;
@@ -1412,7 +1628,8 @@ public class LeafQueue implements CSQueue {
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
-      ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) {
+      ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
+      boolean sortQueues) {
     if (application != null) {
 
       boolean removed = false;
@@ -1449,7 +1666,7 @@ public class LeafQueue implements CSQueue {
       if (removed) {
         // Inform the parent queue _outside_ of the leaf-queue lock
         getParent().completedContainer(clusterResource, application, node,
-          rmContainer, null, event, this);
+          rmContainer, null, event, this, sortQueues);
       }
     }
   }
@@ -1466,6 +1683,8 @@ public class LeafQueue implements CSQueue {
     String userName = application.getUser();
     User user = getUser(userName);
     user.assignContainer(resource);
+    // Note this is a bit unconventional since it gets the object and modifies it here
+    // rather then using set routine
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     
@@ -1585,7 +1804,7 @@ public class LeafQueue implements CSQueue {
 
     public synchronized void releaseContainer(Resource resource) {
       Resources.subtractFrom(consumed, resource);
-    }
+    }  
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 8c654b7..aa74be1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -100,6 +100,8 @@ public class ParentQueue implements CSQueue {
     RecordFactoryProvider.getRecordFactory(null);
 
   private final ResourceCalculator resourceCalculator;
+
+  private boolean reservationsContinueLooking;
   
   public ParentQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) {
@@ -146,7 +148,8 @@ public class ParentQueue implements CSQueue {
 
     setupQueueConfigs(cs.getClusterResource(),
         capacity, absoluteCapacity, 
-        maximumCapacity, absoluteMaxCapacity, state, acls);
+        maximumCapacity, absoluteMaxCapacity, state, acls,
+        cs.getConfiguration().getReservationContinueLook());
     
     this.queueComparator = cs.getQueueComparator();
     this.childQueues = new TreeSet<CSQueue>(queueComparator);
@@ -160,7 +163,8 @@ public class ParentQueue implements CSQueue {
       Resource clusterResource,
       float capacity, float absoluteCapacity, 
       float maximumCapacity, float absoluteMaxCapacity,
-      QueueState state, Map<QueueACL, AccessControlList> acls
+      QueueState state, Map<QueueACL, AccessControlList> acls,
+      boolean continueLooking
   ) {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -180,6 +184,8 @@ public class ParentQueue implements CSQueue {
     this.queueInfo.setMaximumCapacity(this.maximumCapacity);
     this.queueInfo.setQueueState(this.state);
 
+    this.reservationsContinueLooking = continueLooking;
+
     StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
       aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
@@ -195,7 +201,8 @@ public class ParentQueue implements CSQueue {
         ", maxCapacity=" + maximumCapacity +
         ", asboluteMaxCapacity=" + absoluteMaxCapacity + 
         ", state=" + state +
-        ", acls=" + aclsString);
+        ", acls=" + aclsString +
+        ", reservationsContinueLooking=" + reservationsContinueLooking);
   }
 
   private static float PRECISION = 0.0005f; // 0.05% precision
@@ -383,7 +390,8 @@ public class ParentQueue implements CSQueue {
         newlyParsedParentQueue.maximumCapacity, 
         newlyParsedParentQueue.absoluteMaxCapacity,
         newlyParsedParentQueue.state, 
-        newlyParsedParentQueue.acls);
+        newlyParsedParentQueue.acls,
+        newlyParsedParentQueue.reservationsContinueLooking);
 
     // Re-configure existing child queues and add new ones
     // The CS has already checked to ensure all existing child queues are present!
@@ -551,7 +559,7 @@ public class ParentQueue implements CSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node) {
+      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -561,14 +569,19 @@ public class ParentQueue implements CSQueue {
           + getQueueName());
       }
       
+      boolean localNeedToUnreserve = false;
       // Are we over maximum-capacity for this queue?
       if (!assignToQueue(clusterResource)) {
-        break;
+        // check to see if we could if we unreserve first
+        localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
+        if (!localNeedToUnreserve) {
+          break;
+        }
       }
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node);
+          assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -632,6 +645,39 @@ public class ParentQueue implements CSQueue {
     return true;
 
   }
+
+  
+  private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
+    if (this.reservationsContinueLooking) {      
+      // check to see if we could potentially use this node instead of a reserved
+      // node
+
+      Resource reservedResources = Resources.createResource(getMetrics()
+          .getReservedMB(), getMetrics().getReservedVirtualCores());
+      float capacityWithoutReservedCapacity = Resources.divide(
+          resourceCalculator, clusterResource,
+          Resources.subtract(usedResources, reservedResources),
+          clusterResource);
+
+      if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("parent: try to use reserved: " + getQueueName()
+            + " usedResources: " + usedResources.getMemory()
+            + " clusterResources: " + clusterResource.getMemory()
+            + " reservedResources: " + reservedResources.getMemory()
+            + " currentCapacity " + ((float) usedResources.getMemory())
+            / clusterResource.getMemory()
+            + " potentialNewWithoutReservedCapacity: "
+            + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+            + absoluteMaxCapacity + ")");
+        }
+        // we could potentially use this node instead of reserved node
+        return true;
+      }
+    }
+    return false;
+   }
+
   
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
@@ -640,7 +686,7 @@ public class ParentQueue implements CSQueue {
   }
   
   synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
-      FiCaSchedulerNode node) {
+      FiCaSchedulerNode node, boolean needToUnreserve) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -653,7 +699,7 @@ public class ParentQueue implements CSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      assignment = childQueue.assignContainers(cluster, node);
+      assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
@@ -697,7 +743,8 @@ public class ParentQueue implements CSQueue {
   public void completedContainer(Resource clusterResource,
       FiCaSchedulerApp application, FiCaSchedulerNode node, 
       RMContainer rmContainer, ContainerStatus containerStatus, 
-      RMContainerEventType event, CSQueue completedChildQueue) {
+      RMContainerEventType event, CSQueue completedChildQueue,
+      boolean sortQueues) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
@@ -713,16 +760,21 @@ public class ParentQueue implements CSQueue {
             " cluster=" + clusterResource);
       }
 
-      // reinsert the updated queue
-      for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
-        CSQueue csqueue = iter.next();
-        if(csqueue.equals(completedChildQueue))
-        {
-          iter.remove();
-          LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + 
-              " stats: " + csqueue);
-          childQueues.add(csqueue);
-          break;
+      // Note that this is using an iterator on the childQueues so this can't be
+      // called if already within an iterator for the childQueues. Like  
+      // from assignContainersToChildQueues.
+      if (sortQueues) {
+        // reinsert the updated queue
+        for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
+          CSQueue csqueue = iter.next();
+          if(csqueue.equals(completedChildQueue))
+          {
+            iter.remove();
+            LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() + 
+                " stats: " + csqueue);
+            childQueues.add(csqueue);
+            break;
+          }
         }
       }
       
@@ -730,10 +782,15 @@ public class ParentQueue implements CSQueue {
       if (parent != null) {
         // complete my parent
         parent.completedContainer(clusterResource, application, 
-            node, rmContainer, null, event, this);
+            node, rmContainer, null, event, this, sortQueues);
       }    
     }
   }
+
+  @Private
+  boolean getReservationContinueLooking() {
+    return reservationsContinueLooking;
+  }
   
   synchronized void allocateResource(Resource clusterResource, 
       Resource resource) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 167dcd8..dc0d0f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -254,5 +254,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       currentContPreemption, Collections.singletonList(rr),
       allocation.getNMTokenList());
   }
+  
+  synchronized public NodeId getNodeIdToUnreserve(Priority priority,
+      Resource capability) {
+
+    // first go around make this algorithm simple and just grab first
+    // reservation that has enough resources
+    Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
+        .get(priority);
+
+    if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
+      for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+        // make sure we unreserve one with at least the same amount of
+        // resources, otherwise could affect capacity limits
+        if (Resources.fitsIn(capability, entry.getValue().getContainer()
+            .getResource())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("unreserving node with reservation size: "
+                + entry.getValue().getContainer().getResource()
+                + " in order to allocate container with size: " + capability);
+          }
+          return entry.getKey();
+        }
+      }
+    }
+    return null;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index a9a9975..ff8e873 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -516,7 +516,7 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0);
+    queue.assignContainers(clusterResource, node_0, false);
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
 
@@ -535,7 +535,7 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
     verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
     verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
     
@@ -554,7 +554,7 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
     verify(app_0_1).setHeadroom(eq(expectedHeadroom));
@@ -562,7 +562,7 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
     verify(app_0_1).setHeadroom(eq(expectedHeadroom));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 66ec0e6..fdb9028 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -141,7 +142,7 @@ public class TestChildQueueOrder {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
-          when(queue).assignContainers(eq(clusterResource), eq(node));
+          when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -152,7 +153,7 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node));
+    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
     doNothing().when(node).releaseContainer(any(Container.class));
   }
 
@@ -244,7 +245,6 @@ public class TestChildQueueOrder {
     doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
         any(ContainerStatus.class),any(RMContainerEventType.class));
 
-    //
     Priority priority = TestUtils.createMockPriority(1); 
     ContainerAllocationExpirer expirer = 
       mock(ContainerAllocationExpirer.class);
@@ -269,14 +269,14 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0);
+    root.assignContainers(clusterResource, node_0, false);
     for(int i=0; i < 2; i++)
     {
       stubQueueAllocation(a, clusterResource, node_0, 0*GB);
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0);
+      root.assignContainers(clusterResource, node_0, false);
     } 
     for(int i=0; i < 3; i++)
     {
@@ -284,7 +284,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0);
+      root.assignContainers(clusterResource, node_0, false);
     }  
     for(int i=0; i < 4; i++)
     {
@@ -292,7 +292,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0);
+      root.assignContainers(clusterResource, node_0, false);
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -305,7 +305,7 @@ public class TestChildQueueOrder {
     for(int i=0; i < 3;i++)
     {
       d.completedContainer(clusterResource, app_0, node_0,
-          rmContainer, null, RMContainerEventType.KILL, null);
+          rmContainer, null, RMContainerEventType.KILL, null, true);
     }
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -325,7 +325,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0);
+      root.assignContainers(clusterResource, node_0, false);
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -336,7 +336,7 @@ public class TestChildQueueOrder {
 
     //Release 1GB Container from A
     a.completedContainer(clusterResource, app_0, node_0, 
-        rmContainer, null, RMContainerEventType.KILL, null);
+        rmContainer, null, RMContainerEventType.KILL, null, true);
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -352,7 +352,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0);
+    root.assignContainers(clusterResource, node_0, false);
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -362,7 +362,7 @@ public class TestChildQueueOrder {
 
     //Release 1GB container resources from B
     b.completedContainer(clusterResource, app_0, node_0, 
-        rmContainer, null, RMContainerEventType.KILL, null);
+        rmContainer, null, RMContainerEventType.KILL, null, true);
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -378,7 +378,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0);
+    root.assignContainers(clusterResource, node_0, false);
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -392,12 +392,12 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0);
+    root.assignContainers(clusterResource, node_0, false);
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class));
+        any(FiCaSchedulerNode.class), anyBoolean());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class));
+        any(FiCaSchedulerNode.class), anyBoolean());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);


Mime
View raw message