hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [2/2] hadoop git commit: YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan. (cherry picked from commit 487374b7fe0c92fc7eb1406c568952722b5d5b15)
Date Tue, 17 Mar 2015 17:25:18 GMT
YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
(cherry picked from commit 487374b7fe0c92fc7eb1406c568952722b5d5b15)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c601e49
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c601e49
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c601e49

Branch: refs/heads/branch-2
Commit: 1c601e492f4cd80e012aa78b796383ee9de161fd
Parents: 895588b
Author: Jian He <jianhe@apache.org>
Authored: Tue Mar 17 10:22:15 2015 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Tue Mar 17 10:25:07 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     | 112 ++++++-
 .../scheduler/capacity/CSQueue.java             |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |  33 ++-
 .../scheduler/capacity/LeafQueue.java           | 292 +++++++------------
 .../scheduler/capacity/ParentQueue.java         | 140 +++------
 .../scheduler/common/fica/FiCaSchedulerApp.java |  16 +-
 .../capacity/TestApplicationLimits.java         |   8 +-
 .../capacity/TestCapacityScheduler.java         |  59 ++++
 .../scheduler/capacity/TestChildQueueOrder.java |  25 +-
 .../scheduler/capacity/TestLeafQueue.java       | 142 ++++-----
 .../scheduler/capacity/TestParentQueue.java     |  97 +++---
 .../scheduler/capacity/TestReservations.java    | 147 +++++-----
 13 files changed, 561 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b2f25cd..e15fdf2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -8,6 +8,9 @@ Release 2.8.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    YARN-3243. CapacityScheduler should pass headroom from parent to children
+    to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d800709..4e53060 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
+  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
   
   CSQueue parent;
   final String queueName;
@@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
                                         parentQ.getPreemptionDisabled());
   }
   
-  protected Resource getCurrentResourceLimit(Resource clusterResource,
-      ResourceLimits currentResourceLimits) {
+  private Resource getCurrentLimitResource(String nodeLabel,
+      Resource clusterResource, ResourceLimits currentResourceLimits) {
     /*
-     * Queue's max available resource = min(my.max, my.limit)
-     * my.limit is set by my parent, considered used resource of my siblings
+     * Current limit resource: For labeled resource: limit = queue-max-resource
+     * (TODO, this part need update when we support labeled-limit) For
+     * non-labeled resource: limit = min(queue-max-resource,
+     * limit-set-by-parent)
      */
     Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
-            queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
-    Resource queueCurrentResourceLimit =
-        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
-            currentResourceLimits.getLimit());
-    queueCurrentResourceLimit =
-        Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
-            minimumAllocation);
-    return queueCurrentResourceLimit;
+        Resources.multiplyAndNormalizeDown(resourceCalculator,
+            labelManager.getResourceByLabel(nodeLabel, clusterResource),
+            queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
+    if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
+      return Resources.min(resourceCalculator, clusterResource,
+          queueMaxResource, currentResourceLimits.getLimit());
+    }
+    return queueMaxResource;
+  }
+  
+  synchronized boolean canAssignToThisQueue(Resource clusterResource,
+      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
+      Resource nowRequired, Resource resourceCouldBeUnreserved) {
+    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+    Set<String> labelCanAccess;
+    if (null == nodeLabels || nodeLabels.isEmpty()) {
+      labelCanAccess = new HashSet<String>();
+      // Any queue can always access any node without label
+      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+    } else {
+      labelCanAccess = new HashSet<String>(
+          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+              : Sets.intersection(accessibleLabels, nodeLabels));
+    }
+    
+    for (String label : labelCanAccess) {
+      // New total resource = used + required
+      Resource newTotalResource =
+          Resources.add(queueUsage.getUsed(label), nowRequired);
+
+      Resource currentLimitResource =
+          getCurrentLimitResource(label, clusterResource, currentResourceLimits);
+
+      // if reservation continous looking enabled, check to see if could we
+      // potentially use this node instead of a reserved node if the application
+      // has reserved containers.
+      // TODO, now only consider reservation cases when the node has no label
+      if (this.reservationsContinueLooking
+          && label.equals(RMNodeLabelsManager.NO_LABEL)
+          && Resources.greaterThan(resourceCalculator, clusterResource,
+              resourceCouldBeUnreserved, Resources.none())) {
+        // resource-without-reserved = used - reserved
+        Resource newTotalWithoutReservedResource =
+            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+        
+        // when total-used-without-reserved-resource < currentLimit, we still
+        // have chance to allocate on this node by unreserving some containers
+        if (Resources.lessThan(resourceCalculator, clusterResource,
+            newTotalWithoutReservedResource, currentLimitResource)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("try to use reserved: " + getQueueName()
+                + " usedResources: " + queueUsage.getUsed()
+                + ", clusterResources: " + clusterResource
+                + ", reservedResources: " + resourceCouldBeUnreserved
+                + ", capacity-without-reserved: "
+                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+                + currentLimitResource); 
+          }
+          return true;
+        }
+      }
+      
+      // Otherwise, if any of the label of this node beyond queue limit, we
+      // cannot allocate on this node. Consider a small epsilon here.
+      if (Resources.greaterThan(resourceCalculator, clusterResource,
+          newTotalResource, currentLimitResource)) {
+        return false;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getQueueName()
+            + "Check assign to queue, label=" + label
+            + " usedResources: " + queueUsage.getUsed(label)
+            + " clusterResources: " + clusterResource
+            + " currentUsedCapacity "
+            + Resources.divide(resourceCalculator, clusterResource,
+                queueUsage.getUsed(label),
+                labelManager.getResourceByLabel(label, clusterResource))
+            + " max-capacity: "
+            + queueCapacities.getAbsoluteMaximumCapacity(label)
+            + ")");
+      }
+      return true;
+    }
+    
+    // Actually, this will not happen, since labelCanAccess will be always
+    // non-empty
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 0a60acc..1a9448a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
-   * @param needToUnreserve assign container only if it can unreserve one first
    * @param resourceLimits how much overall resource of this queue can use. 
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits);
+      FiCaSchedulerNode node, ResourceLimits resourceLimits);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 756e537..c86c0ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
           node.getNodeID());
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      CSAssignment assignment = queue.assignContainers(clusterResource, node,
-          false, new ResourceLimits(
-              clusterResource));
+      CSAssignment assignment =
+          queue.assignContainers(
+              clusterResource,
+              node,
+              // TODO, now we only consider limits for parent for non-labeled
+              // resources, should consider labeled resources as well.
+              new ResourceLimits(labelManager.getResourceByLabel(
+                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node, false, new ResourceLimits(
-            clusterResource));
+        root.assignContainers(
+            clusterResource,
+            node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager.getResourceByLabel(
+                RMNodeLabelsManager.NO_LABEL, clusterResource)));
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
+    
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
     int numNodes = numNodeManagers.incrementAndGet();
@@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
     if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
-    
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
-    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a607a62..dd6a894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 
 @Private
 @Unstable
@@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
     // and all queues may not be realized yet, we'll use (optimistic) 
     // absoluteMaxCapacity (it will be replaced with the more accurate 
     // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     userLimit = conf.getUserLimit(getQueuePath());
@@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+      FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
             continue;
           }
           if (!this.reservationsContinueLooking) {
-            if (!needContainers(application, priority, required)) {
+            if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("doesn't need containers based on reservation algo!");
               }
@@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
                   required, requestedNodeLabels);          
           
           // Check queue max-capacity limit
-          if (!canAssignToThisQueue(clusterResource, required,
-              node.getLabels(), application, true)) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
+              this.currentResourceLimits, required, application.getCurrentReservation())) {
             return NULL_ASSIGNMENT;
           }
 
@@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null, needToUnreserve);
+                null);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Try to assign if we have sufficient resources
     assignContainersOnNode(clusterResource, node, application, priority, 
-        rmContainer, false);
+        rmContainer);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
     return headroom;
   }
-
-  synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, 
-      boolean checkReservations) {
-    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
-    Set<String> labelCanAccess;
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      labelCanAccess = new HashSet<String>();
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    } else {
-      labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      Resource potentialTotalCapacity =
-          Resources.add(queueUsage.getUsed(label), required);
-      
-      float potentialNewCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              potentialTotalCapacity,
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if enabled, check to see if could we potentially use this node instead
-      // of a reserved node if the application has reserved containers
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking && checkReservations
-          && label.equals(RMNodeLabelsManager.NO_LABEL)) {
-        float potentialNewWithoutReservedCapacity = Resources.divide(
-            resourceCalculator,
-            clusterResource,
-            Resources.subtract(potentialTotalCapacity,
-               application.getCurrentReservation()),
-            labelManager.getResourceByLabel(label, clusterResource));
-
-        if (potentialNewWithoutReservedCapacity <= queueCapacities
-            .getAbsoluteMaximumCapacity()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: "
-                + getQueueName()
-                + " usedResources: "
-                + queueUsage.getUsed()
-                + " clusterResources: "
-                + clusterResource
-                + " reservedResources: "
-                + application.getCurrentReservation()
-                + " currentCapacity "
-                + Resources.divide(resourceCalculator, clusterResource,
-                    queueUsage.getUsed(), clusterResource) + " required " + required
-                + " potentialNewWithoutReservedCapacity: "
-                + potentialNewWithoutReservedCapacity + " ( "
-                + " max-capacity: "
-                + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-          }
-          // we could potentially use this node instead of reserved node
-          return true;
-        }
-      }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
-      if (potentialNewCapacity > queueCapacities
-          .getAbsoluteMaximumCapacity(label) + 1e-4) {
-        canAssign = false;
-        break;
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " potentialNewCapacity: " + potentialNewCapacity + " ( "
-            + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
-            + ")");
-      }
-    }
-    
-    return canAssign;
-  }
   
-  private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+  private void setQueueResourceLimitsInfo(
       Resource clusterResource) {
-    Resource queueCurrentResourceLimit =
-        getCurrentResourceLimit(clusterResource, currentResourceLimits);
-    
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+      queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+          .getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
-
-    return queueCurrentResourceLimit;
   }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
 
-    Resource currentResourceLimit =
-        computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
+        getHeadroom(queueUser, currentResourceLimits.getLimit(),
+            clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + currentResourceLimit + 
+          " queueMaxAvailRes=" + currentResourceLimits.getLimit() + 
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
@@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  boolean needContainers(FiCaSchedulerApp application, Priority priority,
-      Resource required) {
+  boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
+      Priority priority, Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
@@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve) {
+      RMContainer reservedContainer) {
     Resource assigned = Resources.none();
 
     NodeType requestType = null;
@@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
       requestType = NodeType.NODE_LOCAL;
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
 
       // update locality statistics
@@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
     
     return SKIP_ASSIGNMENT;
   }
+  
+  private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
+    // First we need to get minimum resource we need unreserve
+    // minimum-resource-need-unreserve = used + asked - limit
+    Resource minimumUnreservedResource =
+        Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
+            currentResourceLimits.getLimit());
+    return minimumUnreservedResource;
+  }
 
   @Private
   protected boolean findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource capability) {
+      Resource askedResource, Resource minimumUnreservedResource) {
     // need to unreserve some other container first
-    NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+    NodeId idToUnreserve =
+        application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
+            resourceCalculator, clusterResource);
     if (idToUnreserve == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("checked to see if could unreserve for app but nothing "
@@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("unreserving for app: " + application.getApplicationId()
         + " on nodeId: " + idToUnreserve
         + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + capability);
+        + node.getNodeID() + " needing: " + askedResource);
     }
 
     // headroom
@@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability,
-      boolean needToUnreserve) {
-    if (needToUnreserve) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("we needed to unreserve to be able to allocate");
-      }
-      return false;
-    }
-
+      FiCaSchedulerApp application, Resource capability) {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
@@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Check queue max-capacity limit,
     // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
+    if (!canAssignToThisQueue(clusterResource, null,
+        this.currentResourceLimits, capability, Resources.none())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignRackLocalContainers(Resource clusterResource,
+      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignOffSwitchContainers(Resource clusterResource,
+      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
@@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      boolean needToUnreserve, MutableObject createdContainer) {
+      MutableObject createdContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
         + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type
-        + " needToUnreserve= " + needToUnreserve);
+        + " request=" + request + " type=" + type);
     }
     
     // check if the resource request can access the label
@@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
     Resource available = node.getAvailableResource();
     Resource totalResource = node.getTotalResource();
 
-    if (!Resources.fitsIn(capability, totalResource)) {
+    if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+        capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
       return Resources.none();
     }
+
     assert Resources.greaterThan(
         resourceCalculator, clusterResource, available, Resources.none());
 
@@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.warn("Couldn't get container for allocation!");
       return Resources.none();
     }
-
-    // default to true since if reservation continue look feature isn't on
-    // needContainers is checked earlier and we wouldn't have gotten this far
-    boolean canAllocContainer = true;
-    if (this.reservationsContinueLooking) {
-      // based on reservations can we allocate/reserve more or do we need
-      // to unreserve one first
-      canAllocContainer = needContainers(application, priority, capability);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("can alloc container is: " + canAllocContainer);
-      }
-    }
+    
+    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+        application, priority, capability);
 
     // Can we allocate a container on this node?
     int availableContainers = 
@@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
       // Did we previously reserve containers at this 'priority'?
       if (rmContainer != null) {
         unreserve(application, priority, node, rmContainer);
-      } else if (this.reservationsContinueLooking
-          && (!canAllocContainer || needToUnreserve)) {
-        // need to unreserve some other container first
-        boolean res = findNodeToUnreserve(clusterResource, node, application,
-            priority, capability);
-        if (!res) {
-          return Resources.none();
-        }
-      } else {
-        // we got here by possibly ignoring queue capacity limits. If the
-        // parameter needToUnreserve is true it means we ignored one of those
-        // limits in the chance we could unreserve. If we are here we aren't
-        // trying to unreserve so we can't allocate anymore due to that parent
-        // limit.
-        if (needToUnreserve) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("we needed to unreserve to be able to allocate, skipping");
+      } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
+        // when reservationsContinueLooking is set, we may need to unreserve
+        // some containers to meet this queue and its parents' resource limits
+        // TODO, need change here when we want to support continuous reservation
+        // looking for labeled partitions.
+        Resource minimumUnreservedResource =
+            getMinimumResourceNeedUnreserved(capability);
+        if (!shouldAllocOrReserveNewContainer
+            || Resources.greaterThan(resourceCalculator, clusterResource,
+                minimumUnreservedResource, Resources.none())) {
+          boolean containerUnreserved =
+              findNodeToUnreserve(clusterResource, node, application, priority,
+                  capability, minimumUnreservedResource);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved 
+          // container (That means we *have to* unreserve some resource to
+          // continue)). If we failed to unreserve some resource,
+          if (!containerUnreserved) {
+            return Resources.none();
           }
-          return Resources.none();
         }
       }
 
@@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
     } else {
       // if we are allowed to allocate but this node doesn't have space, reserve it or
       // if this was an already a reserved container, reserve it again
-      if ((canAllocContainer) || (rmContainer != null)) {
-
-        if (reservationsContinueLooking) {
-          // we got here by possibly ignoring parent queue capacity limits. If
-          // the parameter needToUnreserve is true it means we ignored one of
-          // those limits in the chance we could unreserve. If we are here
-          // we aren't trying to unreserve so we can't allocate
-          // anymore due to that parent limit
-          boolean res = checkLimitsToReserve(clusterResource, application, capability, 
-              needToUnreserve);
-          if (!res) {
+      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+        if (reservationsContinueLooking && rmContainer == null) {
+          // we could possibly ignoring parent queue capacity limits when
+          // reservationsContinueLooking is set.
+          // If we're trying to reserve a container here, not container will be
+          // unreserved for reserving the new one. Check limits again before
+          // reserve the new container
+          if (!checkLimitsToReserve(clusterResource, 
+              application, capability)) {
             return Resources.none();
           }
         }
@@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
             queueCapacities.getAbsoluteCapacity(), minimumAllocation);
   }
+  
+  private void updateCurrentResourceLimits(
+      ResourceLimits currentResourceLimits, Resource clusterResource) {
+    // TODO: need consider non-empty node labels when resource limits supports
+    // node labels
+    // Even if ParentQueue will set limits respect child's max queue capacity,
+    // but when allocating reserved container, CapacityScheduler doesn't do
+    // this. So need cap limits by queue's max capacity here.
+    this.currentResourceLimits = currentResourceLimits;
+    Resource queueMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            queueCapacities
+                .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
+            minimumAllocation);
+    this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+        clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+  }
 
   @Override
   public synchronized void updateClusterResource(Resource clusterResource,
       ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
     // Update headroom info based on new cluster resource value
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 7feaa15..5ed6bb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.Sets;
-
 @Private
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
@@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits) {
+      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     Set<String> nodeLabels = node.getLabels();
@@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
           + getQueueName());
       }
       
-      boolean localNeedToUnreserve = false;
-      
       // Are we over maximum-capacity for this queue?
-      if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
-        // check to see if we could if we unreserve first
-        localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
-        if (!localNeedToUnreserve) {
-          break;
-        }
+      // This will also consider parent's limits and also continuous reservation
+      // looking
+      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
+          minimumAllocation, Resources.createResource(getMetrics()
+              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+        break;
       }
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node,
-              localNeedToUnreserve | needToUnreserve, resourceLimits);
+          assignContainersToChildQueues(clusterResource, node, resourceLimits);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
     return assignment;
   }
 
-  private synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Set<String> nodeLabels) {
-    Set<String> labelCanAccess =
-        new HashSet<String>(
-            accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
-                : Sets.intersection(accessibleLabels, nodeLabels));
-    if (nodeLabels.isEmpty()) {
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      float currentAbsoluteLabelUsedCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              queueUsage.getUsed(label),
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if any of the label doesn't beyond limit, we can allocate on this node
-      if (currentAbsoluteLabelUsedCapacity >= 
-            queueCapacities.getAbsoluteMaximumCapacity(label)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
-              + " current-capacity (" + queueUsage.getUsed(label) + ") "
-              + " >= max-capacity ("
-              + labelManager.getResourceByLabel(label, clusterResource) + ")");
-        }
-        canAssign = false;
-        break;
-      }
-    }
-    
-    return canAssign;
-  }
-
-  
-  private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
-    if (this.reservationsContinueLooking) {      
-      // check to see if we could potentially use this node instead of a reserved
-      // node
-
-      Resource reservedResources = Resources.createResource(getMetrics()
-          .getReservedMB(), getMetrics().getReservedVirtualCores());
-      float capacityWithoutReservedCapacity = Resources.divide(
-          resourceCalculator, clusterResource,
-          Resources.subtract(queueUsage.getUsed(), reservedResources),
-          clusterResource);
-
-      if (capacityWithoutReservedCapacity <= queueCapacities
-          .getAbsoluteMaximumCapacity()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("parent: try to use reserved: " + getQueueName()
-            + " usedResources: " + queueUsage.getUsed().getMemory()
-            + " clusterResources: " + clusterResource.getMemory()
-            + " reservedResources: " + reservedResources.getMemory()
-            + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
-            / clusterResource.getMemory()
-            + " potentialNewWithoutReservedCapacity: "
-            + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-        }
-        // we could potentially use this node instead of reserved node
-        return true;
-      }
-    }
-    return false;
-   }
-
-  
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(resourceCalculator, clusterResource, 
@@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   private ResourceLimits getResourceLimitsOfChild(CSQueue child,
-      Resource clusterResource, ResourceLimits myLimits) {
-    /*
-     * Set head-room of a given child, limit =
-     * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
-     * + child.used. To avoid any of this queue's and its ancestors' limit
-     * being violated
-     */
-    Resource myCurrentLimit =
-        getCurrentResourceLimit(clusterResource, myLimits);
-    // My available resource = my-current-limit - my-used-resource
-    Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
-        getUsedResources());
-    // Child's limit = my-available-resource + resource-already-used-by-child
+      Resource clusterResource, ResourceLimits parentLimits) {
+    // Set resource-limit of a given child, child.limit =
+    // min(my.limit - my.used + child.used, child.max)
+
+    // Parent available resource = parent-limit - parent-used-resource
+    Resource parentMaxAvailableResource =
+        Resources.subtract(parentLimits.getLimit(), getUsedResources());
+
+    // Child's limit = parent-available-resource + child-used
     Resource childLimit =
-        Resources.add(myMaxAvailableResource, child.getUsedResources());
-    
+        Resources.add(parentMaxAvailableResource, child.getUsedResources());
+
+    // Get child's max resource
+    Resource childConfiguredMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            child.getAbsoluteMaximumCapacity(), minimumAllocation);
+
+    // Child's limit should be capped by child configured max resource
+    childLimit =
+        Resources.min(resourceCalculator, clusterResource, childLimit,
+            childConfiguredMaxResource);
+
+    // Normalize before return
+    childLimit =
+        Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
+
     return new ResourceLimits(childLimit);
   }
   
   private synchronized CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits limits) {
+      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, limits);
       
-      assignment =
-          childQueue.assignContainers(cluster, node, needToUnreserve,
-              childLimits);
+      assignment = childQueue.assignContainers(cluster, node, childLimits);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9f97b13..6cc2777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
   
   synchronized public NodeId getNodeIdToUnreserve(Priority priority,
-      Resource capability) {
+      Resource resourceNeedUnreserve, ResourceCalculator rc,
+      Resource clusterResource) {
 
     // first go around make this algorithm simple and just grab first
     // reservation that has enough resources
@@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
       for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+        NodeId nodeId = entry.getKey();
+        Resource containerResource = entry.getValue().getContainer().getResource();
+        
         // make sure we unreserve one with at least the same amount of
         // resources, otherwise could affect capacity limits
-        if (Resources.fitsIn(capability, entry.getValue().getContainer()
-            .getResource())) {
+        if (Resources.lessThanOrEqual(rc, clusterResource,
+            resourceNeedUnreserve, containerResource)) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("unreserving node with reservation size: "
-                + entry.getValue().getContainer().getResource()
-                + " in order to allocate container with size: " + capability);
+                + containerResource
+                + " in order to allocate container with size: " + resourceNeedUnreserve);
           }
-          return entry.getKey();
+          return nodeId;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8cad057..1ca5c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -611,7 +611,7 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -631,7 +631,7 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -651,7 +651,7 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -660,7 +660,7 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 83ab104..7a265dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
     Assert.assertEquals(30 * GB,
         am1.doHeartbeat().getAvailableResources().getMemory());
   }
+  
+  @Test
+  public void testParentQueueMaxCapsAreRespected() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \
+     *     A1 A2 
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    csConf.setCapacity(A, 50);
+    csConf.setMaximumCapacity(A, 50);
+    csConf.setCapacity(B, 50);
+    
+    // Define 2nd-level queues
+    csConf.setQueues(A, new String[] {"a1", "a2"});
+    csConf.setCapacity(A1, 50);
+    csConf.setUserLimitFactor(A1, 100.0f);
+    csConf.setCapacity(A2, 50);
+    csConf.setUserLimitFactor(A2, 100.0f);
+    csConf.setCapacity(B1, B1_CAPACITY);
+    csConf.setUserLimitFactor(B1, 100.0f);
+    
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    
+    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB 
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
+    
+    // Try to launch app2 in a2, asked 2GB, should success 
+    RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    try {
+      // Try to allocate a container, a's usage=11G/max=12
+      // a1's usage=9G/max=12
+      // a2's usage=2G/max=12
+      // In this case, if a2 asked 2G, should fail.
+      waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
+    } catch (AssertionError failure) {
+      // Expected, return;
+      return;
+    }
+    Assert.fail("Shouldn't successfully allocate containers for am2, "
+        + "queue-a's max capacity will be violated if container allocated");
+  }
 
   private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c601e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7edb17d..71dc523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -145,7 +144,7 @@ public class TestChildQueueOrder {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
-              .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+              .assignContainers(eq(clusterResource), eq(node),
                   any(ResourceLimits.class));
 
           // Mock the node's resource availability
@@ -157,7 +156,7 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), 
+    when(queue).assignContainers(eq(clusterResource), eq(node), 
         any(ResourceLimits.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
@@ -274,7 +273,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     for(int i=0; i < 2; i++)
     {
@@ -282,7 +281,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     } 
     for(int i=0; i < 3; i++)
@@ -291,7 +290,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }  
     for(int i=0; i < 4; i++)
@@ -300,7 +299,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
@@ -334,7 +333,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
@@ -362,7 +361,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
@@ -389,7 +388,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -404,13 +403,13 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);


Mime
View raw message