hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject hadoop git commit: YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation. (Thomas Graves via wangda)
Date Mon, 11 May 2015 22:17:25 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 1e4a686ea -> 84f7641f7


YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation.
(Thomas Graves via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84f7641f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84f7641f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84f7641f

Branch: refs/heads/branch-2.7
Commit: 84f7641f7eb1caadf28a36af9bd8e2f2e59cfe90
Parents: 1e4a686
Author: Wangda Tan <wangda@apache.org>
Authored: Mon May 11 15:13:39 2015 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Mon May 11 15:14:06 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/ResourceLimits.java               |  29 +++-
 .../scheduler/capacity/AbstractCSQueue.java     |  89 ++++++-----
 .../scheduler/capacity/LeafQueue.java           | 149 ++++++++-----------
 .../scheduler/capacity/TestReservations.java    |  65 +++++---
 5 files changed, 177 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84f7641f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 10b2a93..08cfb0c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -74,6 +74,9 @@ Release 2.7.1 - UNRELEASED
     YARN-3476. Nodemanager can fail to delete local logs if log aggregation
     fails (Rohith via jlowe)
 
+    YARN-3434. Interaction between reservations and userlimit can result in 
+    significant ULF violation. (Thomas Graves via wangda)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84f7641f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 12333e8..00107f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -19,22 +19,43 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Resource limits for queues/applications, this means max overall (please note
  * that, it's not "extra") resource you can get.
  */
 public class ResourceLimits {
+  volatile Resource limit;
+
+  // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
+  // config. This limit indicates how much we need to unreserve to allocate
+  // another container.
+  private volatile Resource amountNeededUnreserve;
+
   public ResourceLimits(Resource limit) {
+    this.amountNeededUnreserve = Resources.none();
     this.limit = limit;
   }
-  
-  volatile Resource limit;
+
+  public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
+    this.amountNeededUnreserve = amountNeededUnreserve;
+    this.limit = limit;
+  }
+
   public Resource getLimit() {
     return limit;
   }
-  
+
+  public Resource getAmountNeededUnreserve() {
+    return amountNeededUnreserve;
+  }
+
   public void setLimit(Resource limit) {
     this.limit = limit;
   }
-}
+
+  public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
+    this.amountNeededUnreserve = amountNeededUnreserve;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84f7641f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 4e53060..0c9bff9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -453,54 +453,53 @@ public abstract class AbstractCSQueue implements CSQueue {
       Resource currentLimitResource =
           getCurrentLimitResource(label, clusterResource, currentResourceLimits);
 
-      // if reservation continous looking enabled, check to see if could we
-      // potentially use this node instead of a reserved node if the application
-      // has reserved containers.
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking
-          && label.equals(RMNodeLabelsManager.NO_LABEL)
-          && Resources.greaterThan(resourceCalculator, clusterResource,
-              resourceCouldBeUnreserved, Resources.none())) {
-        // resource-without-reserved = used - reserved
-        Resource newTotalWithoutReservedResource =
-            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-        
-        // when total-used-without-reserved-resource < currentLimit, we still
-        // have chance to allocate on this node by unreserving some containers
-        if (Resources.lessThan(resourceCalculator, clusterResource,
-            newTotalWithoutReservedResource, currentLimitResource)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: " + getQueueName()
-                + " usedResources: " + queueUsage.getUsed()
-                + ", clusterResources: " + clusterResource
-                + ", reservedResources: " + resourceCouldBeUnreserved
-                + ", capacity-without-reserved: "
-                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
-                + currentLimitResource); 
-          }
-          return true;
-        }
-      }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           newTotalResource, currentLimitResource)) {
-        return false;
-      }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentUsedCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity(label)
-            + ")");
+        // if reservation continous looking enabled, check to see if could we
+        // potentially use this node instead of a reserved node if the application
+        // has reserved containers.
+        // TODO, now only consider reservation cases when the node has no label
+        if (this.reservationsContinueLooking
+            && label.equals(RMNodeLabelsManager.NO_LABEL)
+            && Resources.greaterThan(resourceCalculator, clusterResource,
+            resourceCouldBeUnreserved, Resources.none())) {
+          // resource-without-reserved = used - reserved
+          Resource newTotalWithoutReservedResource =
+              Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+          // when total-used-without-reserved-resource < currentLimit, we still
+          // have chance to allocate on this node by unreserving some containers
+          if (Resources.lessThan(resourceCalculator, clusterResource,
+              newTotalWithoutReservedResource, currentLimitResource)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("try to use reserved: " + getQueueName()
+                  + " usedResources: " + queueUsage.getUsed()
+                  + ", clusterResources: " + clusterResource
+                  + ", reservedResources: " + resourceCouldBeUnreserved
+                  + ", capacity-without-reserved: "
+                  + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+                  + currentLimitResource);
+            }
+            currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
+                currentLimitResource));
+            return true;
+          }
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getQueueName()
+              + "Check assign to queue, label=" + label
+              + " usedResources: " + queueUsage.getUsed(label)
+              + " clusterResources: " + clusterResource
+              + " currentUsedCapacity "
+              + Resources.divide(resourceCalculator, clusterResource,
+              queueUsage.getUsed(label),
+              labelManager.getResourceByLabel(label, clusterResource))
+              + " max-capacity: "
+              + queueCapacities.getAbsoluteMaximumCapacity(label)
+              + ")");
+        }
+        return false;
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84f7641f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index fa0e280..65a7472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -119,7 +119,7 @@ public class LeafQueue extends AbstractCSQueue {
   private final QueueResourceLimitsInfo queueResourceLimitsInfo =
       new QueueResourceLimitsInfo();
   
-  private volatile ResourceLimits currentResourceLimits = null;
+  private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -149,7 +149,7 @@ public class LeafQueue extends AbstractCSQueue {
     this.lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
-    this.currentResourceLimits = new ResourceLimits(clusterResource);
+    this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
     
     // Initialize headroom info, also used for calculating application 
     // master resource limits.  Since this happens during queue initialization
@@ -820,13 +820,13 @@ public class LeafQueue extends AbstractCSQueue {
           
           // Check queue max-capacity limit
           if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
-              this.currentResourceLimits, required, application.getCurrentReservation()))
{
+              currentResourceLimits, required, application.getCurrentReservation())) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
           if (!assignToUser(clusterResource, application.getUser(), userLimit,
-              application, true, requestedNodeLabels)) {
+              application, requestedNodeLabels, currentResourceLimits)) {
             break;
           }
 
@@ -836,7 +836,7 @@ public class LeafQueue extends AbstractCSQueue {
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null);
+                null, currentResourceLimits);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -897,7 +897,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Try to assign if we have sufficient resources
     assignContainersOnNode(clusterResource, node, application, priority, 
-        rmContainer);
+        rmContainer, new ResourceLimits(Resources.none()));
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -943,7 +943,7 @@ public class LeafQueue extends AbstractCSQueue {
   private void setQueueResourceLimitsInfo(
       Resource clusterResource) {
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+      queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
           .getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
@@ -964,13 +964,13 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, currentResourceLimits.getLimit(),
+        getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
             clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + currentResourceLimits.getLimit() + 
+          " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
@@ -1078,7 +1078,7 @@ public class LeafQueue extends AbstractCSQueue {
   @Private
   protected synchronized boolean assignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
-      boolean checkReservations, Set<String> requestLabels) {
+      Set<String> requestLabels, ResourceLimits currentResoureLimits) {
     User user = getUser(userName);
     
     String label = CommonNodeLabelsManager.NO_LABEL;
@@ -1094,12 +1094,12 @@ public class LeafQueue extends AbstractCSQueue {
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking && checkReservations) {
+      if (this.reservationsContinueLooking) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getUsed(),
-                application.getCurrentReservation()), limit)) {
+            Resources.subtract(user.getUsed(), application.getCurrentReservation()),
+            limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
@@ -1107,6 +1107,13 @@ public class LeafQueue extends AbstractCSQueue {
                 + user.getUsed() + " reserved: "
                 + application.getCurrentReservation() + " limit: " + limit);
           }
+          Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(label), limit);
+          // we can only acquire a new container if we unreserve first since we ignored the
+          // user limit. Choose the max of user limit or what was previously set by max
+          // capacity.
+          currentResoureLimits.setAmountNeededUnreserve(Resources.max(resourceCalculator,
+              clusterResource, currentResoureLimits.getAmountNeededUnreserve(),
+              amountNeededToUnreserve));
           return true;
         }
       }
@@ -1153,7 +1160,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer) {
+      RMContainer reservedContainer, ResourceLimits currentResoureLimits) {
     Resource assigned = Resources.none();
 
     NodeType requestType = null;
@@ -1166,7 +1173,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, currentResoureLimits);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
 
@@ -1194,7 +1201,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, currentResoureLimits);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
 
@@ -1222,7 +1229,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, currentResoureLimits);
 
       // update locality statistics
       if (allocatedContainer.getValue() != null) {
@@ -1233,20 +1240,11 @@ public class LeafQueue extends AbstractCSQueue {
     
     return SKIP_ASSIGNMENT;
   }
-  
-  private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
-    // First we need to get minimum resource we need unreserve
-    // minimum-resource-need-unreserve = used + asked - limit
-    Resource minimumUnreservedResource =
-        Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
-            currentResourceLimits.getLimit());
-    return minimumUnreservedResource;
-  }
 
   @Private
   protected boolean findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource askedResource, Resource minimumUnreservedResource) {
+      Resource minimumUnreservedResource) {
     // need to unreserve some other container first
     NodeId idToUnreserve =
         application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
@@ -1267,7 +1265,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("unreserving for app: " + application.getApplicationId()
         + " on nodeId: " + idToUnreserve
         + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + askedResource);
+        + node.getNodeID() + " needing: " + minimumUnreservedResource);
     }
 
     // headroom
@@ -1286,45 +1284,16 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  @Private
-  protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability) {
-    // we can't reserve if we got here based on the limit
-    // checks assuming we could unreserve!!!
-    Resource userLimit = computeUserLimitAndSetHeadroom(application,
-        clusterResource, capability, null);
-
-    // Check queue max-capacity limit,
-    // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, null,
-        this.currentResourceLimits, capability, Resources.none())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("was going to reserve but hit queue limit");
-      }
-      return false;
-    }
-
-    // Check user limit
-    if (!assignToUser(clusterResource, application.getUser(), userLimit,
-        application, false, null)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("was going to reserve but hit user limit");
-      }
-      return false;
-    }
-    return true;
-  }
-
-
   private Resource assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, currentResoureLimits);
     }
     
     return Resources.none();
@@ -1333,12 +1302,13 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignRackLocalContainers(Resource clusterResource,
       ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, currentResoureLimits);
     }
     
     return Resources.none();
@@ -1347,12 +1317,13 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignOffSwitchContainers(Resource clusterResource,
       ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, currentResoureLimits);
     }
     
     return Resources.none();
@@ -1436,7 +1407,7 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer) {
+      MutableObject createdContainer, ResourceLimits currentResoureLimits) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
@@ -1488,6 +1459,10 @@ public class LeafQueue extends AbstractCSQueue {
     // Can we allocate a container on this node?
     int availableContainers = 
         resourceCalculator.computeAvailableContainers(available, capability);
+
+    boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
+        currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
+
     if (availableContainers > 0) {
       // Allocate...
 
@@ -1496,20 +1471,24 @@ public class LeafQueue extends AbstractCSQueue {
         unreserve(application, priority, node, rmContainer);
       } else if (this.reservationsContinueLooking && node.getLabels().isEmpty())
{
         // when reservationsContinueLooking is set, we may need to unreserve
-        // some containers to meet this queue and its parents' resource limits
+        // some containers to meet this queue, its parents', or the users' resource limits.
         // TODO, need change here when we want to support continuous reservation
         // looking for labeled partitions.
-        Resource minimumUnreservedResource =
-            getMinimumResourceNeedUnreserved(capability);
-        if (!shouldAllocOrReserveNewContainer
-            || Resources.greaterThan(resourceCalculator, clusterResource,
-                minimumUnreservedResource, Resources.none())) {
+        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+          // If we shouldn't allocate/reserve new container then we should unreserve one
the same
+          // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
+          // could be zero. If the limit was hit then use the amount we need to unreserve
to be
+          // under the limit.
+          Resource amountToUnreserve = capability;
+          if (needToUnreserve) {
+            amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
+          }
           boolean containerUnreserved =
               findNodeToUnreserve(clusterResource, node, application, priority,
-                  capability, minimumUnreservedResource);
-          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved

+                  amountToUnreserve);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
           // container (That means we *have to* unreserve some resource to
-          // continue)). If we failed to unreserve some resource,
+          // continue)). If we failed to unreserve some resource, we can't continue.
           if (!containerUnreserved) {
             return Resources.none();
           }
@@ -1541,13 +1520,13 @@ public class LeafQueue extends AbstractCSQueue {
       if (shouldAllocOrReserveNewContainer || rmContainer != null) {
 
         if (reservationsContinueLooking && rmContainer == null) {
-          // we could possibly ignoring parent queue capacity limits when
-          // reservationsContinueLooking is set.
-          // If we're trying to reserve a container here, not container will be
-          // unreserved for reserving the new one. Check limits again before
-          // reserve the new container
-          if (!checkLimitsToReserve(clusterResource, 
-              application, capability)) {
+          // we could possibly ignoring queue capacity or user limits when
+          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+          // one.
+          if (needToUnreserve) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("we needed to unreserve to be able to allocate");
+            }
             return Resources.none();
           }
         }
@@ -1679,8 +1658,8 @@ public class LeafQueue extends AbstractCSQueue {
     user.releaseContainer(resource, nodeLabels);
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
       
-    LOG.info(getQueueName() + 
-        " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + 
+    LOG.info(getQueueName() +
+        " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
         " user=" + userName + " user-resources=" + user.getUsed());
   }
   
@@ -1697,14 +1676,14 @@ public class LeafQueue extends AbstractCSQueue {
     // Even if ParentQueue will set limits respect child's max queue capacity,
     // but when allocating reserved container, CapacityScheduler doesn't do
     // this. So need cap limits by queue's max capacity here.
-    this.currentResourceLimits = currentResourceLimits;
+    this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
     Resource queueMaxResource =
         Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
             .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
             queueCapacities
                 .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
             minimumAllocation);
-    this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
         clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84f7641f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c5b7587..826a5f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -748,14 +748,14 @@ public class TestReservations {
 
     // nothing reserved
     boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
-        node_1, app_0, priorityMap, capability, capability);
+        node_1, app_0, priorityMap, capability);
     assertFalse(res);
 
     // reserved but scheduler doesn't know about that node.
     app_0.reserve(node_1, priorityMap, rmContainer, container);
     node_1.reserveResource(app_0, priorityMap, rmContainer);
     res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
-        priorityMap, capability, capability);
+        priorityMap, capability);
     assertFalse(res);
   }
 
@@ -858,11 +858,13 @@ public class TestReservations {
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     Resource capability = Resources.createResource(32 * GB, 0);
+    ResourceLimits limits = new ResourceLimits(clusterResource);
     boolean res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none());
     assertFalse(res);
+    assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
+
 
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
@@ -879,36 +881,42 @@ public class TestReservations {
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
 
     capability = Resources.createResource(5 * GB, 0);
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
+            CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources
                 .createResource(5 * GB));
     assertTrue(res);
+    // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have
to
+    // unreserve 2GB to get the total 5GB needed.
+    // also note vcore checks not enabled
+    assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve());
 
     // tell to not check reservations
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none());
     assertFalse(res);
+    assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
+
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
-    // should return false no matter what checkReservations is passed
-    // in since feature is off
+    // should return false since reservations continue look is off.
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources.none());
     assertFalse(res);
-
+    assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
+            CommonNodeLabelsManager.EMPTY_STRING_SET, limits, capability, Resources
                 .createResource(5 * GB));
     assertFalse(res);
+    assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
   }
 
   public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
@@ -1056,22 +1064,31 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getUsedResource().getMemory());
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
 
-    // set limit so subtrace reservations it can continue
-    Resource limit = Resources.createResource(12 * GB, 0);
-    boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
-        true, null);
+    // not over the limit
+    Resource limit = Resources.createResource(14 * GB, 0);
+    ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
+    boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits);
     assertTrue(res);
+    assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
 
-    // tell it not to check for reservations and should fail as already over
-    // limit
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null);
-    assertFalse(res);
+    // set limit so it subtracts reservations and it can continue
+    limit = Resources.createResource(12 * GB, 0);
+    userResourceLimits = new ResourceLimits(clusterResource);
+    res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits);
+    assertTrue(res);
+    // limit set to 12GB, we are using 13GB (8 allocated,  5 reserved), to get under limit
+    // we need to unreserve 1GB
+    // also note vcore checks not enabled
+    assertEquals(Resources.createResource(1 * GB, 4),
+        userResourceLimits.getAmountNeededUnreserve());
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
+    userResourceLimits = new ResourceLimits(clusterResource);
 
     // should now return false since feature off
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null);
+    res = a.assignToUser(clusterResource, user_0, limit, app_0, null, userResourceLimits);
     assertFalse(res);
+    assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
   }
 
   @Test


Mime
View raw message