hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [2/2] hadoop git commit: YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
Date Tue, 17 Mar 2015 17:24:46 GMT
YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/487374b7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/487374b7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/487374b7

Branch: refs/heads/trunk
Commit: 487374b7fe0c92fc7eb1406c568952722b5d5b15
Parents: a89b087
Author: Jian He <jianhe@apache.org>
Authored: Tue Mar 17 10:22:15 2015 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Tue Mar 17 10:24:23 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     | 112 ++++++-
 .../scheduler/capacity/CSQueue.java             |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |  33 ++-
 .../scheduler/capacity/LeafQueue.java           | 292 +++++++------------
 .../scheduler/capacity/ParentQueue.java         | 140 +++------
 .../scheduler/common/fica/FiCaSchedulerApp.java |  16 +-
 .../capacity/TestApplicationLimits.java         |   8 +-
 .../capacity/TestCapacityScheduler.java         |  59 ++++
 .../scheduler/capacity/TestChildQueueOrder.java |  25 +-
 .../scheduler/capacity/TestLeafQueue.java       | 142 ++++-----
 .../scheduler/capacity/TestParentQueue.java     |  97 +++---
 .../scheduler/capacity/TestReservations.java    | 147 +++++-----
 13 files changed, 561 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 82934ad..f5b72d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    YARN-3243. CapacityScheduler should pass headroom from parent to children
+    to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d800709..4e53060 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
+  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
   
   CSQueue parent;
   final String queueName;
@@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
                                         parentQ.getPreemptionDisabled());
   }
   
-  protected Resource getCurrentResourceLimit(Resource clusterResource,
-      ResourceLimits currentResourceLimits) {
+  private Resource getCurrentLimitResource(String nodeLabel,
+      Resource clusterResource, ResourceLimits currentResourceLimits) {
     /*
-     * Queue's max available resource = min(my.max, my.limit)
-     * my.limit is set by my parent, considered used resource of my siblings
+     * Current limit resource: For labeled resource: limit = queue-max-resource
+     * (TODO, this part need update when we support labeled-limit) For
+     * non-labeled resource: limit = min(queue-max-resource,
+     * limit-set-by-parent)
      */
     Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
-            queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
-    Resource queueCurrentResourceLimit =
-        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
-            currentResourceLimits.getLimit());
-    queueCurrentResourceLimit =
-        Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
-            minimumAllocation);
-    return queueCurrentResourceLimit;
+        Resources.multiplyAndNormalizeDown(resourceCalculator,
+            labelManager.getResourceByLabel(nodeLabel, clusterResource),
+            queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
+    if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
+      return Resources.min(resourceCalculator, clusterResource,
+          queueMaxResource, currentResourceLimits.getLimit());
+    }
+    return queueMaxResource;
+  }
+  
+  synchronized boolean canAssignToThisQueue(Resource clusterResource,
+      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
+      Resource nowRequired, Resource resourceCouldBeUnreserved) {
+    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+    Set<String> labelCanAccess;
+    if (null == nodeLabels || nodeLabels.isEmpty()) {
+      labelCanAccess = new HashSet<String>();
+      // Any queue can always access any node without label
+      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+    } else {
+      labelCanAccess = new HashSet<String>(
+          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+              : Sets.intersection(accessibleLabels, nodeLabels));
+    }
+    
+    for (String label : labelCanAccess) {
+      // New total resource = used + required
+      Resource newTotalResource =
+          Resources.add(queueUsage.getUsed(label), nowRequired);
+
+      Resource currentLimitResource =
+          getCurrentLimitResource(label, clusterResource, currentResourceLimits);
+
+      // if reservation continous looking enabled, check to see if could we
+      // potentially use this node instead of a reserved node if the application
+      // has reserved containers.
+      // TODO, now only consider reservation cases when the node has no label
+      if (this.reservationsContinueLooking
+          && label.equals(RMNodeLabelsManager.NO_LABEL)
+          && Resources.greaterThan(resourceCalculator, clusterResource,
+              resourceCouldBeUnreserved, Resources.none())) {
+        // resource-without-reserved = used - reserved
+        Resource newTotalWithoutReservedResource =
+            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+        
+        // when total-used-without-reserved-resource < currentLimit, we still
+        // have chance to allocate on this node by unreserving some containers
+        if (Resources.lessThan(resourceCalculator, clusterResource,
+            newTotalWithoutReservedResource, currentLimitResource)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("try to use reserved: " + getQueueName()
+                + " usedResources: " + queueUsage.getUsed()
+                + ", clusterResources: " + clusterResource
+                + ", reservedResources: " + resourceCouldBeUnreserved
+                + ", capacity-without-reserved: "
+                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+                + currentLimitResource); 
+          }
+          return true;
+        }
+      }
+      
+      // Otherwise, if any of the label of this node beyond queue limit, we
+      // cannot allocate on this node. Consider a small epsilon here.
+      if (Resources.greaterThan(resourceCalculator, clusterResource,
+          newTotalResource, currentLimitResource)) {
+        return false;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getQueueName()
+            + "Check assign to queue, label=" + label
+            + " usedResources: " + queueUsage.getUsed(label)
+            + " clusterResources: " + clusterResource
+            + " currentUsedCapacity "
+            + Resources.divide(resourceCalculator, clusterResource,
+                queueUsage.getUsed(label),
+                labelManager.getResourceByLabel(label, clusterResource))
+            + " max-capacity: "
+            + queueCapacities.getAbsoluteMaximumCapacity(label)
+            + ")");
+      }
+      return true;
+    }
+    
+    // Actually, this will not happen, since labelCanAccess will be always
+    // non-empty
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 0a60acc..1a9448a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
-   * @param needToUnreserve assign container only if it can unreserve one first
    * @param resourceLimits how much overall resource of this queue can use. 
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits);
+      FiCaSchedulerNode node, ResourceLimits resourceLimits);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 756e537..c86c0ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
           node.getNodeID());
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      CSAssignment assignment = queue.assignContainers(clusterResource, node,
-          false, new ResourceLimits(
-              clusterResource));
+      CSAssignment assignment =
+          queue.assignContainers(
+              clusterResource,
+              node,
+              // TODO, now we only consider limits for parent for non-labeled
+              // resources, should consider labeled resources as well.
+              new ResourceLimits(labelManager.getResourceByLabel(
+                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node, false, new ResourceLimits(
-            clusterResource));
+        root.assignContainers(
+            clusterResource,
+            node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager.getResourceByLabel(
+                RMNodeLabelsManager.NO_LABEL, clusterResource)));
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
+    
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
     int numNodes = numNodeManagers.incrementAndGet();
@@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
     if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
-    
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
-    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a607a62..dd6a894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 
 @Private
 @Unstable
@@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
     // and all queues may not be realized yet, we'll use (optimistic) 
     // absoluteMaxCapacity (it will be replaced with the more accurate 
     // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     userLimit = conf.getUserLimit(getQueuePath());
@@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+      FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
             continue;
           }
           if (!this.reservationsContinueLooking) {
-            if (!needContainers(application, priority, required)) {
+            if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("doesn't need containers based on reservation algo!");
               }
@@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
                   required, requestedNodeLabels);          
           
           // Check queue max-capacity limit
-          if (!canAssignToThisQueue(clusterResource, required,
-              node.getLabels(), application, true)) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
+              this.currentResourceLimits, required, application.getCurrentReservation())) {
             return NULL_ASSIGNMENT;
           }
 
@@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null, needToUnreserve);
+                null);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Try to assign if we have sufficient resources
     assignContainersOnNode(clusterResource, node, application, priority, 
-        rmContainer, false);
+        rmContainer);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
     return headroom;
   }
-
-  synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, 
-      boolean checkReservations) {
-    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
-    Set<String> labelCanAccess;
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      labelCanAccess = new HashSet<String>();
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    } else {
-      labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      Resource potentialTotalCapacity =
-          Resources.add(queueUsage.getUsed(label), required);
-      
-      float potentialNewCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              potentialTotalCapacity,
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if enabled, check to see if could we potentially use this node instead
-      // of a reserved node if the application has reserved containers
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking && checkReservations
-          && label.equals(RMNodeLabelsManager.NO_LABEL)) {
-        float potentialNewWithoutReservedCapacity = Resources.divide(
-            resourceCalculator,
-            clusterResource,
-            Resources.subtract(potentialTotalCapacity,
-               application.getCurrentReservation()),
-            labelManager.getResourceByLabel(label, clusterResource));
-
-        if (potentialNewWithoutReservedCapacity <= queueCapacities
-            .getAbsoluteMaximumCapacity()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: "
-                + getQueueName()
-                + " usedResources: "
-                + queueUsage.getUsed()
-                + " clusterResources: "
-                + clusterResource
-                + " reservedResources: "
-                + application.getCurrentReservation()
-                + " currentCapacity "
-                + Resources.divide(resourceCalculator, clusterResource,
-                    queueUsage.getUsed(), clusterResource) + " required " + required
-                + " potentialNewWithoutReservedCapacity: "
-                + potentialNewWithoutReservedCapacity + " ( "
-                + " max-capacity: "
-                + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-          }
-          // we could potentially use this node instead of reserved node
-          return true;
-        }
-      }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
-      if (potentialNewCapacity > queueCapacities
-          .getAbsoluteMaximumCapacity(label) + 1e-4) {
-        canAssign = false;
-        break;
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " potentialNewCapacity: " + potentialNewCapacity + " ( "
-            + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
-            + ")");
-      }
-    }
-    
-    return canAssign;
-  }
   
-  private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+  private void setQueueResourceLimitsInfo(
       Resource clusterResource) {
-    Resource queueCurrentResourceLimit =
-        getCurrentResourceLimit(clusterResource, currentResourceLimits);
-    
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+      queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+          .getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
-
-    return queueCurrentResourceLimit;
   }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
 
-    Resource currentResourceLimit =
-        computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
+        getHeadroom(queueUser, currentResourceLimits.getLimit(),
+            clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + currentResourceLimit + 
+          " queueMaxAvailRes=" + currentResourceLimits.getLimit() + 
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
@@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  boolean needContainers(FiCaSchedulerApp application, Priority priority,
-      Resource required) {
+  boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
+      Priority priority, Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
@@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve) {
+      RMContainer reservedContainer) {
     Resource assigned = Resources.none();
 
     NodeType requestType = null;
@@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
       requestType = NodeType.NODE_LOCAL;
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
 
       // update locality statistics
@@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
     
     return SKIP_ASSIGNMENT;
   }
+  
+  private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
+    // First we need to get minimum resource we need unreserve
+    // minimum-resource-need-unreserve = used + asked - limit
+    Resource minimumUnreservedResource =
+        Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
+            currentResourceLimits.getLimit());
+    return minimumUnreservedResource;
+  }
 
   @Private
   protected boolean findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource capability) {
+      Resource askedResource, Resource minimumUnreservedResource) {
     // need to unreserve some other container first
-    NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+    NodeId idToUnreserve =
+        application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
+            resourceCalculator, clusterResource);
     if (idToUnreserve == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("checked to see if could unreserve for app but nothing "
@@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("unreserving for app: " + application.getApplicationId()
         + " on nodeId: " + idToUnreserve
         + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + capability);
+        + node.getNodeID() + " needing: " + askedResource);
     }
 
     // headroom
@@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability,
-      boolean needToUnreserve) {
-    if (needToUnreserve) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("we needed to unreserve to be able to allocate");
-      }
-      return false;
-    }
-
+      FiCaSchedulerApp application, Resource capability) {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
@@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Check queue max-capacity limit,
     // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
+    if (!canAssignToThisQueue(clusterResource, null,
+        this.currentResourceLimits, capability, Resources.none())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignRackLocalContainers(Resource clusterResource,
+      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignOffSwitchContainers(Resource clusterResource,
+      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
@@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      boolean needToUnreserve, MutableObject createdContainer) {
+      MutableObject createdContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
         + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type
-        + " needToUnreserve= " + needToUnreserve);
+        + " request=" + request + " type=" + type);
     }
     
     // check if the resource request can access the label
@@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
     Resource available = node.getAvailableResource();
     Resource totalResource = node.getTotalResource();
 
-    if (!Resources.fitsIn(capability, totalResource)) {
+    if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+        capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
       return Resources.none();
     }
+
     assert Resources.greaterThan(
         resourceCalculator, clusterResource, available, Resources.none());
 
@@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.warn("Couldn't get container for allocation!");
       return Resources.none();
     }
-
-    // default to true since if reservation continue look feature isn't on
-    // needContainers is checked earlier and we wouldn't have gotten this far
-    boolean canAllocContainer = true;
-    if (this.reservationsContinueLooking) {
-      // based on reservations can we allocate/reserve more or do we need
-      // to unreserve one first
-      canAllocContainer = needContainers(application, priority, capability);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("can alloc container is: " + canAllocContainer);
-      }
-    }
+    
+    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+        application, priority, capability);
 
     // Can we allocate a container on this node?
     int availableContainers = 
@@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
       // Did we previously reserve containers at this 'priority'?
       if (rmContainer != null) {
         unreserve(application, priority, node, rmContainer);
-      } else if (this.reservationsContinueLooking
-          && (!canAllocContainer || needToUnreserve)) {
-        // need to unreserve some other container first
-        boolean res = findNodeToUnreserve(clusterResource, node, application,
-            priority, capability);
-        if (!res) {
-          return Resources.none();
-        }
-      } else {
-        // we got here by possibly ignoring queue capacity limits. If the
-        // parameter needToUnreserve is true it means we ignored one of those
-        // limits in the chance we could unreserve. If we are here we aren't
-        // trying to unreserve so we can't allocate anymore due to that parent
-        // limit.
-        if (needToUnreserve) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("we needed to unreserve to be able to allocate, skipping");
+      } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
+        // when reservationsContinueLooking is set, we may need to unreserve
+        // some containers to meet this queue and its parents' resource limits
+        // TODO, need change here when we want to support continuous reservation
+        // looking for labeled partitions.
+        Resource minimumUnreservedResource =
+            getMinimumResourceNeedUnreserved(capability);
+        if (!shouldAllocOrReserveNewContainer
+            || Resources.greaterThan(resourceCalculator, clusterResource,
+                minimumUnreservedResource, Resources.none())) {
+          boolean containerUnreserved =
+              findNodeToUnreserve(clusterResource, node, application, priority,
+                  capability, minimumUnreservedResource);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved 
+          // container (That means we *have to* unreserve some resource to
+          // continue)). If we failed to unreserve some resource,
+          if (!containerUnreserved) {
+            return Resources.none();
           }
-          return Resources.none();
         }
       }
 
@@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
     } else {
       // if we are allowed to allocate but this node doesn't have space, reserve it or
       // if this was an already a reserved container, reserve it again
-      if ((canAllocContainer) || (rmContainer != null)) {
-
-        if (reservationsContinueLooking) {
-          // we got here by possibly ignoring parent queue capacity limits. If
-          // the parameter needToUnreserve is true it means we ignored one of
-          // those limits in the chance we could unreserve. If we are here
-          // we aren't trying to unreserve so we can't allocate
-          // anymore due to that parent limit
-          boolean res = checkLimitsToReserve(clusterResource, application, capability, 
-              needToUnreserve);
-          if (!res) {
+      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+        if (reservationsContinueLooking && rmContainer == null) {
+          // we could possibly ignoring parent queue capacity limits when
+          // reservationsContinueLooking is set.
+          // If we're trying to reserve a container here, not container will be
+          // unreserved for reserving the new one. Check limits again before
+          // reserve the new container
+          if (!checkLimitsToReserve(clusterResource, 
+              application, capability)) {
             return Resources.none();
           }
         }
@@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
             queueCapacities.getAbsoluteCapacity(), minimumAllocation);
   }
+  
+  private void updateCurrentResourceLimits(
+      ResourceLimits currentResourceLimits, Resource clusterResource) {
+    // TODO: need consider non-empty node labels when resource limits supports
+    // node labels
+    // Even if ParentQueue will set limits respect child's max queue capacity,
+    // but when allocating reserved container, CapacityScheduler doesn't do
+    // this. So need cap limits by queue's max capacity here.
+    this.currentResourceLimits = currentResourceLimits;
+    Resource queueMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            queueCapacities
+                .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
+            minimumAllocation);
+    this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+        clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+  }
 
   @Override
   public synchronized void updateClusterResource(Resource clusterResource,
       ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
     // Update headroom info based on new cluster resource value
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 7feaa15..5ed6bb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.Sets;
-
 @Private
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
@@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits) {
+      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     Set<String> nodeLabels = node.getLabels();
@@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
           + getQueueName());
       }
       
-      boolean localNeedToUnreserve = false;
-      
       // Are we over maximum-capacity for this queue?
-      if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
-        // check to see if we could if we unreserve first
-        localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
-        if (!localNeedToUnreserve) {
-          break;
-        }
+      // This will also consider parent's limits and also continuous reservation
+      // looking
+      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
+          minimumAllocation, Resources.createResource(getMetrics()
+              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+        break;
       }
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node,
-              localNeedToUnreserve | needToUnreserve, resourceLimits);
+          assignContainersToChildQueues(clusterResource, node, resourceLimits);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
     return assignment;
   }
 
-  private synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Set<String> nodeLabels) {
-    Set<String> labelCanAccess =
-        new HashSet<String>(
-            accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
-                : Sets.intersection(accessibleLabels, nodeLabels));
-    if (nodeLabels.isEmpty()) {
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      float currentAbsoluteLabelUsedCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              queueUsage.getUsed(label),
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if any of the label doesn't beyond limit, we can allocate on this node
-      if (currentAbsoluteLabelUsedCapacity >= 
-            queueCapacities.getAbsoluteMaximumCapacity(label)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
-              + " current-capacity (" + queueUsage.getUsed(label) + ") "
-              + " >= max-capacity ("
-              + labelManager.getResourceByLabel(label, clusterResource) + ")");
-        }
-        canAssign = false;
-        break;
-      }
-    }
-    
-    return canAssign;
-  }
-
-  
-  private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
-    if (this.reservationsContinueLooking) {      
-      // check to see if we could potentially use this node instead of a reserved
-      // node
-
-      Resource reservedResources = Resources.createResource(getMetrics()
-          .getReservedMB(), getMetrics().getReservedVirtualCores());
-      float capacityWithoutReservedCapacity = Resources.divide(
-          resourceCalculator, clusterResource,
-          Resources.subtract(queueUsage.getUsed(), reservedResources),
-          clusterResource);
-
-      if (capacityWithoutReservedCapacity <= queueCapacities
-          .getAbsoluteMaximumCapacity()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("parent: try to use reserved: " + getQueueName()
-            + " usedResources: " + queueUsage.getUsed().getMemory()
-            + " clusterResources: " + clusterResource.getMemory()
-            + " reservedResources: " + reservedResources.getMemory()
-            + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
-            / clusterResource.getMemory()
-            + " potentialNewWithoutReservedCapacity: "
-            + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-        }
-        // we could potentially use this node instead of reserved node
-        return true;
-      }
-    }
-    return false;
-   }
-
-  
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(resourceCalculator, clusterResource, 
@@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   private ResourceLimits getResourceLimitsOfChild(CSQueue child,
-      Resource clusterResource, ResourceLimits myLimits) {
-    /*
-     * Set head-room of a given child, limit =
-     * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
-     * + child.used. To avoid any of this queue's and its ancestors' limit
-     * being violated
-     */
-    Resource myCurrentLimit =
-        getCurrentResourceLimit(clusterResource, myLimits);
-    // My available resource = my-current-limit - my-used-resource
-    Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
-        getUsedResources());
-    // Child's limit = my-available-resource + resource-already-used-by-child
+      Resource clusterResource, ResourceLimits parentLimits) {
+    // Set resource-limit of a given child, child.limit =
+    // min(my.limit - my.used + child.used, child.max)
+
+    // Parent available resource = parent-limit - parent-used-resource
+    Resource parentMaxAvailableResource =
+        Resources.subtract(parentLimits.getLimit(), getUsedResources());
+
+    // Child's limit = parent-available-resource + child-used
     Resource childLimit =
-        Resources.add(myMaxAvailableResource, child.getUsedResources());
-    
+        Resources.add(parentMaxAvailableResource, child.getUsedResources());
+
+    // Get child's max resource
+    Resource childConfiguredMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            child.getAbsoluteMaximumCapacity(), minimumAllocation);
+
+    // Child's limit should be capped by child configured max resource
+    childLimit =
+        Resources.min(resourceCalculator, clusterResource, childLimit,
+            childConfiguredMaxResource);
+
+    // Normalize before return
+    childLimit =
+        Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
+
     return new ResourceLimits(childLimit);
   }
   
   private synchronized CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits limits) {
+      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, limits);
       
-      assignment =
-          childQueue.assignContainers(cluster, node, needToUnreserve,
-              childLimits);
+      assignment = childQueue.assignContainers(cluster, node, childLimits);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9f97b13..6cc2777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
   
   synchronized public NodeId getNodeIdToUnreserve(Priority priority,
-      Resource capability) {
+      Resource resourceNeedUnreserve, ResourceCalculator rc,
+      Resource clusterResource) {
 
     // first go around make this algorithm simple and just grab first
     // reservation that has enough resources
@@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
       for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+        NodeId nodeId = entry.getKey();
+        Resource containerResource = entry.getValue().getContainer().getResource();
+        
         // make sure we unreserve one with at least the same amount of
         // resources, otherwise could affect capacity limits
-        if (Resources.fitsIn(capability, entry.getValue().getContainer()
-            .getResource())) {
+        if (Resources.lessThanOrEqual(rc, clusterResource,
+            resourceNeedUnreserve, containerResource)) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("unreserving node with reservation size: "
-                + entry.getValue().getContainer().getResource()
-                + " in order to allocate container with size: " + capability);
+                + containerResource
+                + " in order to allocate container with size: " + resourceNeedUnreserve);
           }
-          return entry.getKey();
+          return nodeId;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8cad057..1ca5c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -611,7 +611,7 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -631,7 +631,7 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -651,7 +651,7 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -660,7 +660,7 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 83ab104..7a265dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
     Assert.assertEquals(30 * GB,
         am1.doHeartbeat().getAvailableResources().getMemory());
   }
+  
+  @Test
+  public void testParentQueueMaxCapsAreRespected() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \
+     *     A1 A2 
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    csConf.setCapacity(A, 50);
+    csConf.setMaximumCapacity(A, 50);
+    csConf.setCapacity(B, 50);
+    
+    // Define 2nd-level queues
+    csConf.setQueues(A, new String[] {"a1", "a2"});
+    csConf.setCapacity(A1, 50);
+    csConf.setUserLimitFactor(A1, 100.0f);
+    csConf.setCapacity(A2, 50);
+    csConf.setUserLimitFactor(A2, 100.0f);
+    csConf.setCapacity(B1, B1_CAPACITY);
+    csConf.setUserLimitFactor(B1, 100.0f);
+    
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    
+    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB 
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
+    
+    // Try to launch app2 in a2, asked 2GB, should success 
+    RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    try {
+      // Try to allocate a container, a's usage=11G/max=12
+      // a1's usage=9G/max=12
+      // a2's usage=2G/max=12
+      // In this case, if a2 asked 2G, should fail.
+      waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
+    } catch (AssertionError failure) {
+      // Expected, return;
+      return;
+    }
+    Assert.fail("Shouldn't successfully allocate containers for am2, "
+        + "queue-a's max capacity will be violated if container allocated");
+  }
 
   private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7edb17d..71dc523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -145,7 +144,7 @@ public class TestChildQueueOrder {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
-              .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+              .assignContainers(eq(clusterResource), eq(node),
                   any(ResourceLimits.class));
 
           // Mock the node's resource availability
@@ -157,7 +156,7 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), 
+    when(queue).assignContainers(eq(clusterResource), eq(node), 
         any(ResourceLimits.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
@@ -274,7 +273,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     for(int i=0; i < 2; i++)
     {
@@ -282,7 +281,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     } 
     for(int i=0; i < 3; i++)
@@ -291,7 +290,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }  
     for(int i=0; i < 4; i++)
@@ -300,7 +299,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
@@ -334,7 +333,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
@@ -362,7 +361,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
@@ -389,7 +388,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -404,13 +403,13 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);


Mime
View raw message