hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [29/43] hadoop git commit: YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan.
Date Tue, 03 Mar 2015 19:32:07 GMT
YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14dd647c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14dd647c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14dd647c

Branch: refs/heads/YARN-2928
Commit: 14dd647c556016d351f425ee956ccf800ccb9ce2
Parents: abac6eb
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Mon Mar 2 17:52:47 2015 -0800
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Mon Mar 2 17:52:47 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/ResourceLimits.java               |  40 +++
 .../scheduler/ResourceUsage.java                |  61 ++---
 .../scheduler/capacity/AbstractCSQueue.java     |  24 +-
 .../scheduler/capacity/CSQueue.java             |  11 +-
 .../scheduler/capacity/CSQueueUtils.java        |  48 ----
 .../capacity/CapacityHeadroomProvider.java      |  16 +-
 .../scheduler/capacity/CapacityScheduler.java   |  30 ++-
 .../scheduler/capacity/LeafQueue.java           | 131 +++++-----
 .../scheduler/capacity/ParentQueue.java         |  53 +++-
 .../yarn/server/resourcemanager/MockAM.java     |  11 +-
 .../scheduler/TestResourceUsage.java            |   2 +-
 .../capacity/TestApplicationLimits.java         |  32 +--
 .../scheduler/capacity/TestCSQueueUtils.java    | 250 -------------------
 .../capacity/TestCapacityScheduler.java         |  85 ++++++-
 .../scheduler/capacity/TestChildQueueOrder.java |  36 ++-
 .../scheduler/capacity/TestLeafQueue.java       | 221 ++++++++++------
 .../scheduler/capacity/TestParentQueue.java     | 106 ++++----
 .../scheduler/capacity/TestReservations.java    | 100 +++++---
 19 files changed, 646 insertions(+), 614 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d07aa26..0850f0b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -686,6 +686,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3270. Fix node label expression not getting set in 
     ApplicationSubmissionContext (Rohit Agarwal via wangda)
 
+    YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's
+    available resource-limit from the parent queue. (Wangda Tan via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
new file mode 100644
index 0000000..12333e8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Resource limits for queues/applications, this means max overall (please note
+ * that, it's not "extra") resource you can get.
+ */
+public class ResourceLimits {
+  public ResourceLimits(Resource limit) {
+    this.limit = limit;
+  }
+  
+  volatile Resource limit;
+  public Resource getLimit() {
+    return limit;
+  }
+  
+  public void setLimit(Resource limit) {
+    this.limit = limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index c651878..de44bbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -50,11 +50,12 @@ public class ResourceUsage {
     writeLock = lock.writeLock();
 
     usages = new HashMap<String, UsageByLabel>();
+    usages.put(NL, new UsageByLabel(NL));
   }
 
   // Usage enum here to make implement cleaner
   private enum ResourceType {
-    USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4);
+    USED(0), PENDING(1), AMUSED(2), RESERVED(3);
 
     private int idx;
 
@@ -71,7 +72,18 @@ public class ResourceUsage {
       resArr = new Resource[ResourceType.values().length];
       for (int i = 0; i < resArr.length; i++) {
         resArr[i] = Resource.newInstance(0, 0);
-      }
+      };
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{used=" + resArr[0] + "%, ");
+      sb.append("pending=" + resArr[1] + "%, ");
+      sb.append("am_used=" + resArr[2] + "%, ");
+      sb.append("reserved=" + resArr[3] + "%, ");
+      sb.append("headroom=" + resArr[4] + "%}");
+      return sb.toString();
     }
   }
 
@@ -181,41 +193,6 @@ public class ResourceUsage {
   }
 
   /*
-   * Headroom
-   */
-  public Resource getHeadroom() {
-    return getHeadroom(NL);
-  }
-
-  public Resource getHeadroom(String label) {
-    return _get(label, ResourceType.HEADROOM);
-  }
-
-  public void incHeadroom(String label, Resource res) {
-    _inc(label, ResourceType.HEADROOM, res);
-  }
-
-  public void incHeadroom(Resource res) {
-    incHeadroom(NL, res);
-  }
-
-  public void decHeadroom(Resource res) {
-    decHeadroom(NL, res);
-  }
-
-  public void decHeadroom(String label, Resource res) {
-    _dec(label, ResourceType.HEADROOM, res);
-  }
-
-  public void setHeadroom(Resource res) {
-    setHeadroom(NL, res);
-  }
-
-  public void setHeadroom(String label, Resource res) {
-    _set(label, ResourceType.HEADROOM, res);
-  }
-
-  /*
    * AM-Used
    */
   public Resource getAMUsed() {
@@ -309,4 +286,14 @@ public class ResourceUsage {
       writeLock.unlock();
     }
   }
+  
+  @Override
+  public String toString() {
+    try {
+      readLock.lock();
+      return usages.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index eb7218b..d800709 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -40,9 +40,11 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.collect.Sets;
 
@@ -52,7 +54,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   final String queueName;
   volatile int numContainers;
   
-  Resource minimumAllocation;
+  final Resource minimumAllocation;
   Resource maximumAllocation;
   QueueState state;
   final QueueMetrics metrics;
@@ -94,6 +96,7 @@ public abstract class AbstractCSQueue implements CSQueue {
             cs.getConf());
 
     this.csContext = cs;
+    this.minimumAllocation = csContext.getMinimumResourceCapability();
     
     // initialize ResourceUsage
     queueUsage = new ResourceUsage();
@@ -248,7 +251,6 @@ public abstract class AbstractCSQueue implements CSQueue {
     // After we setup labels, we can setup capacities
     setupConfigurableCapacities();
     
-    this.minimumAllocation = csContext.getMinimumResourceCapability();
     this.maximumAllocation =
         csContext.getConfiguration().getMaximumAllocationPerQueue(
             getQueuePath());
@@ -403,4 +405,22 @@ public abstract class AbstractCSQueue implements CSQueue {
     return csConf.getPreemptionDisabled(q.getQueuePath(),
                                         parentQ.getPreemptionDisabled());
   }
+  
+  protected Resource getCurrentResourceLimit(Resource clusterResource,
+      ResourceLimits currentResourceLimits) {
+    /*
+     * Queue's max available resource = min(my.max, my.limit)
+     * my.limit is set by my parent, considered used resource of my siblings
+     */
+    Resource queueMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
+            queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
+    Resource queueCurrentResourceLimit =
+        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
+            currentResourceLimits.getLimit());
+    queueCurrentResourceLimit =
+        Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
+            minimumAllocation);
+    return queueCurrentResourceLimit;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 5cf38c1..0a60acc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -189,10 +190,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @param needToUnreserve assign container only if it can unreserve one first
+   * @param resourceLimits how much overall resource of this queue can use. 
    * @return the assignment
    */
-  public CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
+  public CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits resourceLimits);
   
   /**
    * A container assigned to the queue has completed.
@@ -231,8 +234,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    /**
    * Update the cluster resource for queues as we add/remove nodes
    * @param clusterResource the current cluster resource
+   * @param resourceLimits the current ResourceLimits
    */
-  public void updateClusterResource(Resource clusterResource);
+  public void updateClusterResource(Resource clusterResource,
+      ResourceLimits resourceLimits);
   
   /**
    * Get the {@link ActiveUsersManager} for the queue.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 865b0b4..1921195 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -225,52 +225,4 @@ class CSQueueUtils {
             )
         );
    }
-
-   public static float getAbsoluteMaxAvailCapacity(
-      ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
-      CSQueue parent = queue.getParent();
-      if (parent == null) {
-        return queue.getAbsoluteMaximumCapacity();
-      }
-
-      //Get my parent's max avail, needed to determine my own
-      float parentMaxAvail = getAbsoluteMaxAvailCapacity(
-        resourceCalculator, clusterResource, parent);
-      //...and as a resource
-      Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
-
-      //check for no resources parent before dividing, if so, max avail is none
-      if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
-        return 0.0f;
-      }
-      //sibling used is parent used - my used...
-      float siblingUsedCapacity = Resources.ratio(
-                 resourceCalculator,
-                 Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
-                 parentResource);
-      //my max avail is the lesser of my max capacity and what is unused from my parent
-      //by my siblings (if they are beyond their base capacity)
-      float maxAvail = Math.min(
-        queue.getMaximumCapacity(),
-        1.0f - siblingUsedCapacity);
-      //and, mutiply by parent to get absolute (cluster relative) value
-      float absoluteMaxAvail = maxAvail * parentMaxAvail;
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("qpath " + queue.getQueuePath());
-        LOG.debug("parentMaxAvail " + parentMaxAvail);
-        LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
-        LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
-        LOG.debug("maxAvail " + maxAvail);
-        LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
-      }
-
-      if (absoluteMaxAvail < 0.0f) {
-        absoluteMaxAvail = 0.0f;
-      } else if (absoluteMaxAvail > 1.0f) {
-        absoluteMaxAvail = 1.0f;
-      }
-
-      return absoluteMaxAvail;
-   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index f79d195..c6524c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -26,32 +26,32 @@ public class CapacityHeadroomProvider {
   LeafQueue queue;
   FiCaSchedulerApp application;
   Resource required;
-  LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
+  LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
   
   public CapacityHeadroomProvider(
     LeafQueue.User user,
     LeafQueue queue,
     FiCaSchedulerApp application,
     Resource required,
-    LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
+    LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
     
     this.user = user;
     this.queue = queue;
     this.application = application;
     this.required = required;
-    this.queueHeadroomInfo = queueHeadroomInfo;
+    this.queueResourceLimitsInfo = queueResourceLimitsInfo;
     
   }
   
   public Resource getHeadroom() {
     
-    Resource queueMaxCap;
+    Resource queueCurrentLimit;
     Resource clusterResource;
-    synchronized (queueHeadroomInfo) {
-      queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
-      clusterResource = queueHeadroomInfo.getClusterResource();
+    synchronized (queueResourceLimitsInfo) {
+      queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
+      clusterResource = queueResourceLimitsInfo.getClusterResource();
     }
-    Resource headroom = queue.getHeadroom(user, queueMaxCap, 
+    Resource headroom = queue.getHeadroom(user, queueCurrentLimit, 
       clusterResource, application, required);
     
     // Corner case to deal with applications being slightly over-limit

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6b9d846..28ce264 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,7 +34,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -84,12 +85,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -112,11 +117,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -499,7 +499,8 @@ public class CapacityScheduler extends
     initializeQueueMappings();
 
     // Re-calculate headroom for active applications
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
 
     labelManager.reinitializeQueueLabels(getQueueToLabels());
     setQueueAcls(authorizer, queues);
@@ -990,7 +991,8 @@ public class CapacityScheduler extends
   private synchronized void updateNodeAndQueueResource(RMNode nm, 
       ResourceOption resourceOption) {
     updateNodeResource(nm, resourceOption);
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
   }
   
   /**
@@ -1060,7 +1062,8 @@ public class CapacityScheduler extends
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
       CSAssignment assignment = queue.assignContainers(clusterResource, node,
-          false);
+          false, new ResourceLimits(
+              clusterResource));
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -1084,7 +1087,8 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node, false);
+        root.assignContainers(clusterResource, node, false, new ResourceLimits(
+            clusterResource));
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1205,7 +1209,8 @@ public class CapacityScheduler extends
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     int numNodes = numNodeManagers.incrementAndGet();
     updateMaximumAllocation(schedulerNode, true);
     
@@ -1234,7 +1239,8 @@ public class CapacityScheduler extends
       return;
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     int numNodes = numNodeManagers.decrementAndGet();
 
     if (scheduleAsynchronously && numNodes == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 38d4712..3910ac8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -115,7 +116,10 @@ public class LeafQueue extends AbstractCSQueue {
   // absolute capacity as a resource (based on cluster resource)
   private Resource absoluteCapacityResource = Resources.none();
   
-  private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
+  private final QueueResourceLimitsInfo queueResourceLimitsInfo =
+      new QueueResourceLimitsInfo();
+  
+  private volatile ResourceLimits currentResourceLimits = null;
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -145,13 +149,14 @@ public class LeafQueue extends AbstractCSQueue {
     this.lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
+    this.currentResourceLimits = new ResourceLimits(clusterResource);
+    
     // Initialize headroom info, also used for calculating application 
     // master resource limits.  Since this happens during queue initialization
     // and all queues may not be realized yet, we'll use (optimistic) 
     // absoluteMaxCapacity (it will be replaced with the more accurate 
     // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    updateHeadroomInfo(clusterResource,
-        queueCapacities.getAbsoluteMaximumCapacity());
+    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     userLimit = conf.getUserLimit(getQueuePath());
@@ -544,12 +549,12 @@ public class LeafQueue extends AbstractCSQueue {
       * become busy.
       *
       */
-     Resource queueMaxCap;
-     synchronized (queueHeadroomInfo) {
-       queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+     Resource queueCurrentLimit;
+     synchronized (queueResourceLimitsInfo) {
+       queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
      }
      Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
-       absoluteCapacityResource, queueMaxCap);
+       absoluteCapacityResource, queueCurrentLimit);
      return Resources.multiplyAndNormalizeUp( 
           resourceCalculator,
           queueCap, 
@@ -733,8 +738,10 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve) {
-
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits currentResourceLimits) {
+    this.currentResourceLimits = currentResourceLimits;
+    
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " #applications=" + activeApplications.size());
@@ -876,9 +883,9 @@ public class LeafQueue extends AbstractCSQueue {
 
   }
 
-  private synchronized CSAssignment 
-  assignReservedContainer(FiCaSchedulerApp application, 
-      FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+  private synchronized CSAssignment assignReservedContainer(
+      FiCaSchedulerApp application, FiCaSchedulerNode node,
+      RMContainer rmContainer, Resource clusterResource) {
     // Do we still need this reservation?
     Priority priority = rmContainer.getReservedPriority();
     if (application.getTotalRequiredResources(priority) == 0) {
@@ -895,13 +902,13 @@ public class LeafQueue extends AbstractCSQueue {
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
   }
   
-  protected Resource getHeadroom(User user, Resource queueMaxCap,
+  protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application, Resource required) {
-    return getHeadroom(user, queueMaxCap, clusterResource,
+    return getHeadroom(user, queueCurrentLimit, clusterResource,
 	  computeUserLimit(application, clusterResource, required, user, null));
   }
   
-  private Resource getHeadroom(User user, Resource queueMaxCap,
+  private Resource getHeadroom(User user, Resource currentResourceLimit,
       Resource clusterResource, Resource userLimit) {
     /** 
      * Headroom is:
@@ -923,8 +930,11 @@ public class LeafQueue extends AbstractCSQueue {
     Resource headroom = 
       Resources.min(resourceCalculator, clusterResource,
         Resources.subtract(userLimit, user.getUsed()),
-        Resources.subtract(queueMaxCap, queueUsage.getUsed())
+        Resources.subtract(currentResourceLimit, queueUsage.getUsed())
         );
+    // Normalize it before return
+    headroom =
+        Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
     return headroom;
   }
 
@@ -1012,23 +1022,17 @@ public class LeafQueue extends AbstractCSQueue {
     return canAssign;
   }
   
-  private Resource updateHeadroomInfo(Resource clusterResource, 
-      float absoluteMaxAvailCapacity) {
-  
-    Resource queueMaxCap = 
-      Resources.multiplyAndNormalizeDown(
-          resourceCalculator, 
-          clusterResource, 
-          absoluteMaxAvailCapacity,
-          minimumAllocation);
-
-    synchronized (queueHeadroomInfo) {
-      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
-      queueHeadroomInfo.setClusterResource(clusterResource);
-    }
-    
-    return queueMaxCap;
+  private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+      Resource clusterResource) {
+    Resource queueCurrentResourceLimit =
+        getCurrentResourceLimit(clusterResource, currentResourceLimits);
     
+    synchronized (queueResourceLimitsInfo) {
+      queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+      queueResourceLimitsInfo.setClusterResource(clusterResource);
+    }
+
+    return queueCurrentResourceLimit;
   }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1043,28 +1047,22 @@ public class LeafQueue extends AbstractCSQueue {
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
 
-    //Max avail capacity needs to take into account usage by ancestor-siblings
-    //which are greater than their base capacity, so we are interested in "max avail"
-    //capacity
-    float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, this);
-    
-    Resource queueMaxCap = 
-      updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity);
+    Resource currentResourceLimit =
+        computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
+        getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxCap=" + queueMaxCap + 
+          " queueMaxAvailRes=" + currentResourceLimit + 
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
     
     CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
-      queueUser, this, application, required, queueHeadroomInfo);
+      queueUser, this, application, required, queueResourceLimitsInfo);
     
     application.setHeadroomProvider(headroomProvider);
 
@@ -1249,7 +1247,7 @@ public class LeafQueue extends AbstractCSQueue {
         application.getResourceRequest(priority, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
       assigned = 
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
+          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
               node, application, priority, reservedContainer, needToUnreserve); 
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
@@ -1265,8 +1263,8 @@ public class LeafQueue extends AbstractCSQueue {
         return SKIP_ASSIGNMENT;
       }
       
-      assigned = 
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
+      assigned =
+          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
               node, application, priority, reservedContainer, needToUnreserve);
       if (Resources.greaterThan(resourceCalculator, clusterResource, 
           assigned, Resources.none())) {
@@ -1282,10 +1280,10 @@ public class LeafQueue extends AbstractCSQueue {
         return SKIP_ASSIGNMENT;
       }
 
-      return new CSAssignment(
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-              node, application, priority, reservedContainer, needToUnreserve), 
-              NodeType.OFF_SWITCH);
+      return new CSAssignment(assignOffSwitchContainers(clusterResource,
+          offSwitchResourceRequest, node, application, priority,
+          reservedContainer, needToUnreserve),
+          NodeType.OFF_SWITCH);
     }
     
     return SKIP_ASSIGNMENT;
@@ -1373,7 +1371,7 @@ public class LeafQueue extends AbstractCSQueue {
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
-    if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
+    if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
@@ -1383,9 +1381,9 @@ public class LeafQueue extends AbstractCSQueue {
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+  private Resource assignRackLocalContainers(Resource clusterResource,
+      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
         reservedContainer)) {
@@ -1397,9 +1395,9 @@ public class LeafQueue extends AbstractCSQueue {
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
+  private Resource assignOffSwitchContainers(Resource clusterResource,
+      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer, boolean needToUnreserve) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
         reservedContainer)) {
@@ -1753,15 +1751,16 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource,
+      ResourceLimits currentResourceLimits) {
+    this.currentResourceLimits = currentResourceLimits;
     lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
     // Update headroom info based on new cluster resource value
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
-    updateHeadroomInfo(clusterResource,
-        queueCapacities.getAbsoluteMaximumCapacity());
+    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
@@ -1951,16 +1950,16 @@ public class LeafQueue extends AbstractCSQueue {
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand
    */
-  static class QueueHeadroomInfo {
-    private Resource queueMaxCap;
+  static class QueueResourceLimitsInfo {
+    private Resource queueCurrentLimit;
     private Resource clusterResource;
     
-    public void setQueueMaxCap(Resource queueMaxCap) {
-      this.queueMaxCap = queueMaxCap;
+    public void setQueueCurrentLimit(Resource currentLimit) {
+      this.queueCurrentLimit = currentLimit;
     }
     
-    public Resource getQueueMaxCap() {
-      return queueMaxCap;
+    public Resource getQueueCurrentLimit() {
+      return queueCurrentLimit;
     }
     
     public void setClusterResource(Resource clusterResource) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index a26b0aa..7feaa15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -378,8 +379,9 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized CSAssignment assignContainers(
-      Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
+  public synchronized CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits resourceLimits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     Set<String> nodeLabels = node.getLabels();
@@ -408,7 +410,8 @@ public class ParentQueue extends AbstractCSQueue {
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
+          assignContainersToChildQueues(clusterResource, node,
+              localNeedToUnreserve | needToUnreserve, resourceLimits);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -530,8 +533,29 @@ public class ParentQueue extends AbstractCSQueue {
             node.getAvailableResource(), minimumAllocation);
   }
   
-  private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
-      FiCaSchedulerNode node, boolean needToUnreserve) {
+  private ResourceLimits getResourceLimitsOfChild(CSQueue child,
+      Resource clusterResource, ResourceLimits myLimits) {
+    /*
+     * Set head-room of a given child, limit =
+     * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
+     * + child.used. To avoid any of this queue's and its ancestors' limit
+     * being violated
+     */
+    Resource myCurrentLimit =
+        getCurrentResourceLimit(clusterResource, myLimits);
+    // My available resource = my-current-limit - my-used-resource
+    Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
+        getUsedResources());
+    // Child's limit = my-available-resource + resource-already-used-by-child
+    Resource childLimit =
+        Resources.add(myMaxAvailableResource, child.getUsedResources());
+    
+    return new ResourceLimits(childLimit);
+  }
+  
+  private synchronized CSAssignment assignContainersToChildQueues(
+      Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
+      ResourceLimits limits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -544,7 +568,14 @@ public class ParentQueue extends AbstractCSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
+      
+      // Get ResourceLimits of child queue before assign containers
+      ResourceLimits childLimits =
+          getResourceLimitsOfChild(childQueue, cluster, limits);
+      
+      assignment =
+          childQueue.assignContainers(cluster, node, needToUnreserve,
+              childLimits);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
@@ -638,10 +669,14 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource) {
+  public synchronized void updateClusterResource(Resource clusterResource,
+      ResourceLimits resourceLimits) {
     // Update all children
     for (CSQueue childQueue : childQueues) {
-      childQueue.updateClusterResource(clusterResource);
+      // Get ResourceLimits of child queue before assign containers
+      ResourceLimits childLimits =
+          getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits);     
+      childQueue.updateClusterResource(clusterResource, childLimits);
     }
     
     // Update metrics
@@ -728,4 +763,4 @@ public class ParentQueue extends AbstractCSQueue {
   public synchronized int getNumApplications() {
     return numApplications;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index e1b8a3d..494f5a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -23,14 +23,12 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
 
 public class MockAM {
 
@@ -53,6 +52,7 @@ public class MockAM {
   private RMContext context;
   private ApplicationMasterProtocol amRMProtocol;
   private UserGroupInformation ugi;
+  private volatile AllocateResponse lastResponse;
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
   private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -223,7 +223,8 @@ public class MockAM {
         context.getRMApps().get(attemptId.getApplicationId())
             .getRMAppAttempt(attemptId).getAMRMToken();
     ugi.addTokenIdentifier(token.decodeIdentifier());
-    return doAllocateAs(ugi, allocateRequest);
+    lastResponse = doAllocateAs(ugi, allocateRequest);
+    return lastResponse;
   }
 
   public AllocateResponse doAllocateAs(UserGroupInformation ugi,
@@ -240,6 +241,10 @@ public class MockAM {
       throw (Exception) e.getCause();
     }
   }
+  
+  public AllocateResponse doHeartbeat() throws Exception {
+    return allocate(null, null);
+  }
 
   public void unregisterAppAttempt() throws Exception {
     waitForState(RMAppAttemptState.RUNNING);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java
index b6dfacb..f0bf892 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java
@@ -38,7 +38,7 @@ public class TestResourceUsage {
   @Parameterized.Parameters
   public static Collection<String[]> getParameters() {
     return Arrays.asList(new String[][] { { "Pending" }, { "Used" },
-        { "Headroom" }, { "Reserved" }, { "AMUsed" } });
+        { "Reserved" }, { "AMUsed" } });
   }
 
   public TestResourceUsage(String suffix) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 81a5aad..8cad057 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -21,15 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,8 +37,8 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -53,9 +48,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -63,7 +59,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.Ignore;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 public class TestApplicationLimits {
   
@@ -171,7 +168,9 @@ public class TestApplicationLimits {
     // am limit is 4G initially (based on the queue absolute capacity)
     // when there is only 1 user, and drops to 2G (the userlimit) when there
     // is a second user
-    queue.updateClusterResource(Resource.newInstance(80 * GB, 40));
+    Resource clusterResource = Resource.newInstance(80 * GB, 40);
+    queue.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     
     ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
     when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
@@ -289,7 +288,8 @@ public class TestApplicationLimits {
     
     // Add some nodes to the cluster & test new limits
     clusterResource = Resources.createResource(120 * 16 * GB);
-    root.updateClusterResource(clusterResource);
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
     
     assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
     assertEquals(queue.getUserAMResourceLimit(), 
@@ -611,7 +611,8 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false);
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
@@ -630,7 +631,8 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
     
@@ -649,7 +651,8 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -657,7 +660,8 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
+    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
deleted file mode 100644
index 5135ba9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Test;
-
-public class TestCSQueueUtils {
-
-  private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class);
-
-  final static int GB = 1024;
-
-  @Test
-  public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception {
-    runInvalidDivisorTest(false);
-    runInvalidDivisorTest(true);
-  }
-    
-  public void runInvalidDivisorTest(boolean useDominant) throws Exception {
-  
-    ResourceCalculator resourceCalculator;
-    Resource clusterResource;
-    if (useDominant) {
-      resourceCalculator = new DominantResourceCalculator();
-      clusterResource = Resources.createResource(10, 0);
-    } else {
-      resourceCalculator = new DefaultResourceCalculator();
-      clusterResource = Resources.createResource(0, 99);
-    }
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-  
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(0, 0));
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-  
-    final String L1Q1 = "L1Q1";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 90);
-    csConf.setMaximumCapacity(L1Q1P, 90);
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    
-    LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root));
-    
-    LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1));
-    
-    assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1), 0.000001f);
-    
-  }
-  
-  @Test
-  public void testAbsoluteMaxAvailCapacityNoUse() throws Exception {
-    
-    ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-    
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB, 32));
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-    
-    final String L1Q1 = "L1Q1";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 90);
-    csConf.setMaximumCapacity(L1Q1P, 90);
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    
-    LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root));
-    
-    LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1));
-    
-    assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, root), 0.000001f);
-    
-    assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l1q1), 0.000001f);
-    
-  }
-  
-  @Test
-  public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
-    
-    ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
-    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
-    
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
-    
-    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getClusterResource()).thenReturn(clusterResource);
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB, 32));
-    
-    RMContext rmContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(rmContext);
-    
-    final String L1Q1 = "L1Q1";
-    final String L1Q2 = "L1Q2";
-    final String L2Q1 = "L2Q1";
-    final String L2Q2 = "L2Q2";
-    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2,
-                     L2Q1, L2Q2});
-    
-    final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
-    csConf.setCapacity(L1Q1P, 80);
-    csConf.setMaximumCapacity(L1Q1P, 80);
-    
-    final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2;
-    csConf.setCapacity(L1Q2P, 20);
-    csConf.setMaximumCapacity(L1Q2P, 100);
-    
-    final String L2Q1P = L1Q1P + "." + L2Q1;
-    csConf.setCapacity(L2Q1P, 50);
-    csConf.setMaximumCapacity(L2Q1P, 50);
-    
-    final String L2Q2P = L1Q1P + "." + L2Q2;
-    csConf.setCapacity(L2Q2P, 50);
-    csConf.setMaximumCapacity(L2Q2P, 50);
-    
-    float result;
-    
-    ParentQueue root = new ParentQueue(csContext, 
-        CapacitySchedulerConfiguration.ROOT, null, null);
-    
-    LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
-    LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null);
-    LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null);
-    LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null);
-    
-    //no usage, all based on maxCapacity (prior behavior)
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.4f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //some usage, but below the base capacity
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.4f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //usage gt base on parent sibling
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
-    l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //same as last, but with usage also on direct parent
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //add to direct sibling, below the threshold of effect at present
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.3f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    //add to direct sibling, now above the threshold of effect
-    //(it's cumulative with prior tests)
-    root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
-    result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
-      resourceCalculator, clusterResource, l2q2);
-    assertEquals( 0.1f, result, 0.000001f);
-    LOG.info("t2 l2q2 " + result);
-    
-    
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index fabf47d..83ab104 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -359,7 +360,8 @@ public class TestCapacityScheduler {
     resourceManager.getResourceScheduler().handle(nodeUpdate);
   }
   
-  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+  private CapacitySchedulerConfiguration setupQueueConfiguration(
+      CapacitySchedulerConfiguration conf) {
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
@@ -383,6 +385,7 @@ public class TestCapacityScheduler {
     conf.setUserLimitFactor(B3, 100.0f);
 
     LOG.info("Setup top-level queues a and b");
+    return conf;
   }
   
   @Test
@@ -2400,6 +2403,86 @@ public class TestCapacityScheduler {
     assertEquals("queue B2 max vcores allocation", 12,
         ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
   }
+  
+  private void waitContainerAllocated(MockAM am, int mem, int nContainer,
+      int startContainerId, MockRM rm, MockNM nm) throws Exception {
+    for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {
+      am.allocate("*", mem, 1, new ArrayList<ContainerId>());
+      ContainerId containerId =
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
+      Assert.assertTrue(rm.waitForState(nm, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    }
+  }
+
+  @Test
+  public void testHierarchyQueuesCurrentLimits() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \    / | \
+     *     A1 A2  B1 B2 B3
+     */
+    YarnConfiguration conf =
+        new YarnConfiguration(
+            setupQueueConfiguration(new CapacitySchedulerConfiguration()));
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1);
+
+    // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB
+    // 2 GBs used by am, so it's 71 - 2 = 69G.
+    Assert.assertEquals(69 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+    
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    
+    // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total)
+    waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1);
+    
+    // Allocated one more container with 1 GB resource in b1
+    waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1);
+    
+    // Total is 100 GB, 
+    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
+    // B1 uses 3 GB (2 * 1GB containers and 1 AM container)
+    // Available is 100 - 41 - 3 = 56 GB
+    Assert.assertEquals(56 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+    
+    // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom
+    // of app1 (in queue b1) updated correctly
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+    
+    // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total)
+    waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1);
+    
+    // Allocated one more container with 4 GB resource in b1
+    waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1);
+    
+    // Total is 100 GB, 
+    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
+    // B1 uses 4 GB (3 * 1GB containers and 1 AM container)
+    // A1 uses 25 GB (3 * 8GB containers and 1 AM container)
+    // Available is 100 - 41 - 4 - 25 = 30 GB
+    Assert.assertEquals(30 * GB,
+        am1.doHeartbeat().getAvailableResources().getMemory());
+  }
 
   private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index af58a43..7edb17d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -143,7 +144,9 @@ public class TestChildQueueOrder {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
-          when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
+          when(queue)
+              .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+                  any(ResourceLimits.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -154,7 +157,8 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
+    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), 
+        any(ResourceLimits.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
 
@@ -270,14 +274,16 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     for(int i=0; i < 2; i++)
     {
       stubQueueAllocation(a, clusterResource, node_0, 0*GB);
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     } 
     for(int i=0; i < 3; i++)
     {
@@ -285,7 +291,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }  
     for(int i=0; i < 4; i++)
     {
@@ -293,7 +300,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -326,7 +334,8 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false);
+      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+          clusterResource));
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -353,7 +362,8 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -379,7 +389,8 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -393,12 +404,13 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+        clusterResource));
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);


Mime
View raw message