hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [01/50] [abbrv] hadoop git commit: YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation
Date Mon, 27 Apr 2015 21:26:04 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 fa5cc7524 -> 277d5fd2c


YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ef9bcb4b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ef9bcb4b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ef9bcb4b

Branch: refs/heads/YARN-2928
Commit: ef9bcb4ba17121990bcd5c9b3d428431894196db
Parents: a4dc6af
Author: tgraves <tgraves@apache.org>
Authored: Thu Apr 23 14:39:25 2015 +0000
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Mon Apr 27 13:35:22 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/ResourceLimits.java               |  28 +++-
 .../scheduler/capacity/AbstractCSQueue.java     |  94 +++++------
 .../scheduler/capacity/LeafQueue.java           | 162 ++++++++-----------
 .../scheduler/capacity/TestReservations.java    |  65 +++++---
 5 files changed, 186 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef9bcb4b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index de794e5..020fa29 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -316,6 +316,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3495. Confusing log generated by FairScheduler.
     (Brahma Reddy Battula via ozawa)
 
+    YARN-3434. Interaction between reservations and userlimit can result in 
+    significant ULF violation (tgraves)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef9bcb4b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 12333e8..8074794 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -19,22 +19,44 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Resource limits for queues/applications, this means max overall (please note
  * that, it's not "extra") resource you can get.
  */
 public class ResourceLimits {
+  volatile Resource limit;
+
+  // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
+  // config. This limit indicates how much we need to unreserve to allocate
+  // another container.
+  private volatile Resource amountNeededUnreserve;
+
   public ResourceLimits(Resource limit) {
+    this.amountNeededUnreserve = Resources.none();
     this.limit = limit;
   }
-  
-  volatile Resource limit;
+
+  public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
+    this.amountNeededUnreserve = amountNeededUnreserve;
+    this.limit = limit;
+  }
+
   public Resource getLimit() {
     return limit;
   }
-  
+
+  public Resource getAmountNeededUnreserve() {
+    return amountNeededUnreserve;
+  }
+
   public void setLimit(Resource limit) {
     this.limit = limit;
   }
+
+  public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
+    this.amountNeededUnreserve = amountNeededUnreserve;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef9bcb4b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9233e01..47cea19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -85,7 +85,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
   // etc.
   QueueCapacities queueCapacities;
-  
+
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
   protected CapacitySchedulerContext csContext;
@@ -473,55 +473,55 @@ public abstract class AbstractCSQueue implements CSQueue {
         getCurrentLimitResource(nodePartition, clusterResource,
             currentResourceLimits, schedulingMode);
 
-    // if reservation continous looking enabled, check to see if could we
-    // potentially use this node instead of a reserved node if the application
-    // has reserved containers.
-    // TODO, now only consider reservation cases when the node has no label
-    if (this.reservationsContinueLooking
-        && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
-        && Resources.greaterThan(resourceCalculator, clusterResource,
-            resourceCouldBeUnreserved, Resources.none())) {
-      // resource-without-reserved = used - reserved
-      Resource newTotalWithoutReservedResource =
-          Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-
-      // when total-used-without-reserved-resource < currentLimit, we still
-      // have chance to allocate on this node by unreserving some containers
-      if (Resources.lessThan(resourceCalculator, clusterResource,
-          newTotalWithoutReservedResource, currentLimitResource)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("try to use reserved: " + getQueueName()
-              + " usedResources: " + queueUsage.getUsed()
-              + ", clusterResources: " + clusterResource
-              + ", reservedResources: " + resourceCouldBeUnreserved
-              + ", capacity-without-reserved: "
-              + newTotalWithoutReservedResource + ", maxLimitCapacity: "
-              + currentLimitResource);
-        }
-        return true;
-      }
-    }
-
-    // Check if we over current-resource-limit computed.
     if (Resources.greaterThan(resourceCalculator, clusterResource,
         newTotalResource, currentLimitResource)) {
-      return false;
-    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getQueueName()
-          + "Check assign to queue, nodePartition="
-          + nodePartition
-          + " usedResources: "
-          + queueUsage.getUsed(nodePartition)
-          + " clusterResources: "
-          + clusterResource
-          + " currentUsedCapacity "
-          + Resources.divide(resourceCalculator, clusterResource,
-              queueUsage.getUsed(nodePartition),
-              labelManager.getResourceByLabel(nodePartition, clusterResource))
-          + " max-capacity: "
-          + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+      // if reservation continous looking enabled, check to see if could we
+      // potentially use this node instead of a reserved node if the application
+      // has reserved containers.
+      // TODO, now only consider reservation cases when the node has no label
+      if (this.reservationsContinueLooking
+          && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+          && Resources.greaterThan(resourceCalculator, clusterResource,
+              resourceCouldBeUnreserved, Resources.none())) {
+        // resource-without-reserved = used - reserved
+        Resource newTotalWithoutReservedResource =
+            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+        // when total-used-without-reserved-resource < currentLimit, we still
+        // have chance to allocate on this node by unreserving some containers
+        if (Resources.lessThan(resourceCalculator, clusterResource,
+            newTotalWithoutReservedResource, currentLimitResource)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("try to use reserved: " + getQueueName()
+                + " usedResources: " + queueUsage.getUsed()
+                + ", clusterResources: " + clusterResource
+                + ", reservedResources: " + resourceCouldBeUnreserved
+                + ", capacity-without-reserved: "
+                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+                + currentLimitResource);
+          }
+          currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
+            currentLimitResource));
+          return true;
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getQueueName()
+            + "Check assign to queue, nodePartition="
+            + nodePartition
+            + " usedResources: "
+            + queueUsage.getUsed(nodePartition)
+            + " clusterResources: "
+            + clusterResource
+            + " currentUsedCapacity "
+            + Resources.divide(resourceCalculator, clusterResource,
+                queueUsage.getUsed(nodePartition),
+                labelManager.getResourceByLabel(nodePartition, clusterResource))
+            + " max-capacity: "
+            + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+      }
+      return false;
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef9bcb4b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 69e7e53..22aafaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -119,8 +119,8 @@ public class LeafQueue extends AbstractCSQueue {
   private final QueueResourceLimitsInfo queueResourceLimitsInfo =
       new QueueResourceLimitsInfo();
   
-  private volatile ResourceLimits currentResourceLimits = null;
-  
+  private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
+
   private OrderingPolicy<FiCaSchedulerApp> 
     orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
   
@@ -151,7 +151,7 @@ public class LeafQueue extends AbstractCSQueue {
     this.lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
-    this.currentResourceLimits = new ResourceLimits(clusterResource);
+    this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
     
     // Initialize headroom info, also used for calculating application 
     // master resource limits.  Since this happens during queue initialization
@@ -715,14 +715,14 @@ public class LeafQueue extends AbstractCSQueue {
     activateApplications();
 
     LOG.info("Application removed -" +
-        " appId: " + application.getApplicationId() + 
-        " user: " + application.getUser() + 
+        " appId: " + application.getApplicationId() +
+        " user: " + application.getUser() +
         " queue: " + getQueueName() +
         " #user-pending-applications: " + user.getPendingApplications() +
         " #user-active-applications: " + user.getActiveApplications() +
         " #queue-pending-applications: " + getNumPendingApplications() +
         " #queue-active-applications: " + getNumActiveApplications()
-        );
+    );
   }
 
   private synchronized FiCaSchedulerApp getApplication(
@@ -854,18 +854,18 @@ public class LeafQueue extends AbstractCSQueue {
           //       before all higher priority ones are serviced.
           Resource userLimit = 
               computeUserLimitAndSetHeadroom(application, clusterResource, 
-                  required, node.getPartition(), schedulingMode);          
-          
+                  required, node.getPartition(), schedulingMode);
+
           // Check queue max-capacity limit
           if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-              this.currentResourceLimits, required,
+              currentResourceLimits, required,
               application.getCurrentReservation(), schedulingMode)) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
           if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-              application, true, node.getPartition())) {
+              application, node.getPartition(), currentResourceLimits)) {
             break;
           }
 
@@ -906,9 +906,9 @@ public class LeafQueue extends AbstractCSQueue {
           }
           
           // Try to schedule
-          CSAssignment assignment =  
-            assignContainersOnNode(clusterResource, node, application, priority, 
-                null, schedulingMode);
+          CSAssignment assignment =
+            assignContainersOnNode(clusterResource, node, application, priority,
+                null, schedulingMode, currentResourceLimits);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -975,7 +975,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Try to assign if we have sufficient resources
     CSAssignment tmp =
         assignContainersOnNode(clusterResource, node, application, priority,
-          rmContainer, schedulingMode);
+          rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -1026,7 +1026,7 @@ public class LeafQueue extends AbstractCSQueue {
   private void setQueueResourceLimitsInfo(
       Resource clusterResource) {
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+      queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
           .getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
@@ -1048,13 +1048,13 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, currentResourceLimits.getLimit(),
+        getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
             clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + currentResourceLimits.getLimit() + 
+          " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
@@ -1169,7 +1169,7 @@ public class LeafQueue extends AbstractCSQueue {
   @Private
   protected synchronized boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
-      boolean checkReservations, String nodePartition) {
+      String nodePartition, ResourceLimits currentResoureLimits) {
     User user = getUser(userName);
 
     // Note: We aren't considering the current request since there is a fixed
@@ -1180,13 +1180,13 @@ public class LeafQueue extends AbstractCSQueue {
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking && checkReservations
-          && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
+      if (this.reservationsContinueLooking &&
+          nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getUsed(),
-                application.getCurrentReservation()), limit)) {
+            Resources.subtract(user.getUsed(),application.getCurrentReservation()),
+            limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
@@ -1194,6 +1194,13 @@ public class LeafQueue extends AbstractCSQueue {
                 + user.getUsed() + " reserved: "
                 + application.getCurrentReservation() + " limit: " + limit);
           }
+          Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition),
limit);
+          // we can only acquire a new container if we unreserve first since we ignored the
+          // user limit. Choose the max of user limit or what was previously set by max
+          // capacity.
+          currentResoureLimits.setAmountNeededUnreserve(
+              Resources.max(resourceCalculator, clusterResource,
+                  currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve));
           return true;
         }
       }
@@ -1240,7 +1247,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, SchedulingMode schedulingMode) {
+      RMContainer reservedContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
 
     CSAssignment assigned;
 
@@ -1254,7 +1262,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode);
+            allocatedContainer, schedulingMode, currentResoureLimits);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1283,7 +1291,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode);
+            allocatedContainer, schedulingMode, currentResoureLimits);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1312,7 +1320,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
             node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode);
+            allocatedContainer, schedulingMode, currentResoureLimits);
 
       // update locality statistics
       if (allocatedContainer.getValue() != null) {
@@ -1324,19 +1332,11 @@ public class LeafQueue extends AbstractCSQueue {
     
     return SKIP_ASSIGNMENT;
   }
-  
-  private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
-    // First we need to get minimum resource we need unreserve
-    // minimum-resource-need-unreserve = used + asked - limit
-    return Resources.subtract(
-      Resources.add(queueUsage.getUsed(), askedResource),
-      currentResourceLimits.getLimit());
-  }
 
   @Private
   protected boolean findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource askedResource, Resource minimumUnreservedResource) {
+      Resource minimumUnreservedResource) {
     // need to unreserve some other container first
     NodeId idToUnreserve =
         application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
@@ -1357,7 +1357,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("unreserving for app: " + application.getApplicationId()
         + " on nodeId: " + idToUnreserve
         + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + askedResource);
+        + node.getNodeID() + " needing: " + minimumUnreservedResource);
     }
 
     // headroom
@@ -1376,47 +1376,16 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  @Private
-  protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability, String nodePartition,
-      SchedulingMode schedulingMode) {
-    // we can't reserve if we got here based on the limit
-    // checks assuming we could unreserve!!!
-    Resource userLimit = computeUserLimitAndSetHeadroom(application,
-        clusterResource, capability, nodePartition, schedulingMode);
-
-    // Check queue max-capacity limit,
-    // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
-        this.currentResourceLimits, capability, Resources.none(), schedulingMode)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("was going to reserve but hit queue limit");
-      }
-      return false;
-    }
-
-    // Check user limit
-    if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-        application, false, nodePartition)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("was going to reserve but hit user limit");
-      }
-      return false;
-    }
-    return true;
-  }
-
-
   private CSAssignment assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode) {
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode);
+          allocatedContainer, schedulingMode, currentResoureLimits);
     }
 
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@@ -1426,12 +1395,12 @@ public class LeafQueue extends AbstractCSQueue {
       ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode) {
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode);
+          allocatedContainer, schedulingMode, currentResoureLimits);
     }
 
     return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@@ -1441,12 +1410,12 @@ public class LeafQueue extends AbstractCSQueue {
       ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode) {
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer, schedulingMode);
+          allocatedContainer, schedulingMode, currentResoureLimits);
     }
     
     return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
@@ -1529,7 +1498,8 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer, SchedulingMode schedulingMode) {
+      MutableObject createdContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
@@ -1573,13 +1543,17 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.warn("Couldn't get container for allocation!");
       return new CSAssignment(Resources.none(), type);
     }
-    
+
     boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
         application, priority, capability);
 
     // Can we allocate a container on this node?
     int availableContainers = 
         resourceCalculator.computeAvailableContainers(available, capability);
+
+    boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
+        currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
+
     if (availableContainers > 0) {
       // Allocate...
 
@@ -1588,20 +1562,24 @@ public class LeafQueue extends AbstractCSQueue {
         unreserve(application, priority, node, rmContainer);
       } else if (this.reservationsContinueLooking && node.getLabels().isEmpty())
{
         // when reservationsContinueLooking is set, we may need to unreserve
-        // some containers to meet this queue and its parents' resource limits
+        // some containers to meet this queue, its parents', or the users' resource limits.
         // TODO, need change here when we want to support continuous reservation
         // looking for labeled partitions.
-        Resource minimumUnreservedResource =
-            getMinimumResourceNeedUnreserved(capability);
-        if (!shouldAllocOrReserveNewContainer
-            || Resources.greaterThan(resourceCalculator, clusterResource,
-                minimumUnreservedResource, Resources.none())) {
+        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+          // If we shouldn't allocate/reserve new container then we should unreserve one
the same
+          // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
+          // could be zero. If the limit was hit then use the amount we need to unreserve
to be
+          // under the limit.
+          Resource amountToUnreserve = capability;
+          if (needToUnreserve) {
+            amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
+          }
           boolean containerUnreserved =
               findNodeToUnreserve(clusterResource, node, application, priority,
-                  capability, minimumUnreservedResource);
+                  amountToUnreserve);
           // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved

           // container (That means we *have to* unreserve some resource to
-          // continue)). If we failed to unreserve some resource,
+          // continue)). If we failed to unreserve some resource, we can't continue.
           if (!containerUnreserved) {
             return new CSAssignment(Resources.none(), type);
           }
@@ -1642,13 +1620,13 @@ public class LeafQueue extends AbstractCSQueue {
       if (shouldAllocOrReserveNewContainer || rmContainer != null) {
 
         if (reservationsContinueLooking && rmContainer == null) {
-          // we could possibly ignoring parent queue capacity limits when
-          // reservationsContinueLooking is set.
-          // If we're trying to reserve a container here, not container will be
-          // unreserved for reserving the new one. Check limits again before
-          // reserve the new container
-          if (!checkLimitsToReserve(clusterResource,
-              application, capability, node.getPartition(), schedulingMode)) {
+          // we could possibly ignoring queue capacity or user limits when
+          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+          // one.
+          if (needToUnreserve) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("we needed to unreserve to be able to allocate");
+            }
             return new CSAssignment(Resources.none(), type);
           }
         }
@@ -1811,14 +1789,14 @@ public class LeafQueue extends AbstractCSQueue {
     // Even if ParentQueue will set limits respect child's max queue capacity,
     // but when allocating reserved container, CapacityScheduler doesn't do
     // this. So need cap limits by queue's max capacity here.
-    this.currentResourceLimits = currentResourceLimits;
+    this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
     Resource queueMaxResource =
         Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
             .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
             queueCapacities
                 .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
             minimumAllocation);
-    this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
         clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef9bcb4b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index fc546ee..44845cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -748,14 +748,14 @@ public class TestReservations {
 
     // nothing reserved
     boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
-        node_1, app_0, priorityMap, capability, capability);
+        node_1, app_0, priorityMap, capability);
     assertFalse(res);
 
     // reserved but scheduler doesn't know about that node.
     app_0.reserve(node_1, priorityMap, rmContainer, container);
     node_1.reserveResource(app_0, priorityMap, rmContainer);
     res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
-        priorityMap, capability, capability);
+        priorityMap, capability);
     assertFalse(res);
   }
 
@@ -858,12 +858,13 @@ public class TestReservations {
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     Resource capability = Resources.createResource(32 * GB, 0);
+    ResourceLimits limits = new ResourceLimits(clusterResource);
     boolean res =
         a.canAssignToThisQueue(clusterResource,
-            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
-                clusterResource), capability, Resources.none(),
+            RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
+    assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
 
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
@@ -880,38 +881,43 @@ public class TestReservations {
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
 
     capability = Resources.createResource(5 * GB, 0);
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
-                clusterResource), capability, Resources.createResource(5 * GB),
+            RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5
* GB),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertTrue(res);
+    // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have
to
+    // unreserve 2GB to get the total 5GB needed.
+    // also note vcore checks not enabled
+    assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve());
 
     // tell to not check reservations
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
-                clusterResource), capability, Resources.none(),
+            RMNodeLabelsManager.NO_LABEL,limits, capability, Resources.none(),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
+    assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
-    // should return false no matter what checkReservations is passed
-    // in since feature is off
+    // should return false since reservations continue look is off.
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
-                clusterResource), capability, Resources.none(),
+            RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
-
+    assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
+    limits = new ResourceLimits(clusterResource);
     res =
         a.canAssignToThisQueue(clusterResource,
-            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
-                clusterResource), capability, Resources.createResource(5 * GB),
+            RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5
* GB),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
+    assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
   }
 
   public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
@@ -1059,22 +1065,33 @@ public class TestReservations {
     assertEquals(5 * GB, node_0.getUsedResource().getMemory());
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
 
-    // set limit so subtrace reservations it can continue
-    Resource limit = Resources.createResource(12 * GB, 0);
-    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
-        true, "");
+    // not over the limit
+    Resource limit = Resources.createResource(14 * GB, 0);
+    ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
+    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
     assertTrue(res);
+    assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
 
-    // tell it not to check for reservations and should fail as already over
-    // limit
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
-    assertFalse(res);
+
+    // set limit so it subtracts reservations and it can continue
+    limit = Resources.createResource(12 * GB, 0);
+    userResourceLimits = new ResourceLimits(clusterResource);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
+             "", userResourceLimits);
+    assertTrue(res);
+    // limit set to 12GB, we are using 13GB (8 allocated,  5 reserved), to get under limit
+    // we need to unreserve 1GB
+    // also note vcore checks not enabled
+    assertEquals(Resources.createResource(1 * GB, 4),
+        userResourceLimits.getAmountNeededUnreserve());
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
+    userResourceLimits = new ResourceLimits(clusterResource);
 
     // should now return false since feature off
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
     assertFalse(res);
+    assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
   }
 
   @Test


Mime
View raw message