hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sun...@apache.org
Subject [17/23] hadoop git commit: YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)
Date Wed, 29 Nov 2017 09:30:03 GMT
YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)

Change-Id: I9213f5297a6841fab5c573e85ee4c4e5f4a0b7ff


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba444cbb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba444cbb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba444cbb

Branch: refs/heads/YARN-5881
Commit: ba444cbb47f98999d9f56d71dc4b7ac7e4f9def5
Parents: d331762
Author: Wangda Tan <wangda@apache.org>
Authored: Fri Aug 11 10:30:23 2017 -0700
Committer: Sunil G <sunilg@apache.org>
Committed: Wed Nov 29 14:59:09 2017 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/util/StringUtils.java     |  31 ++
 .../resource/DefaultResourceCalculator.java     |   6 +
 .../resource/DominantResourceCalculator.java    |   7 +
 .../yarn/util/resource/ResourceCalculator.java  |  12 +
 .../hadoop/yarn/util/resource/Resources.java    |   5 +
 .../capacity/FifoCandidatesSelector.java        |   9 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  10 +-
 .../monitor/capacity/TempQueuePerPartition.java |  16 +-
 .../scheduler/AbstractResourceUsage.java        | 198 +++++++
 .../scheduler/QueueResourceQuotas.java          | 153 ++++++
 .../scheduler/ResourceUsage.java                | 237 ++-------
 .../scheduler/capacity/AbstractCSQueue.java     | 162 +++++-
 .../scheduler/capacity/CSQueue.java             |  39 ++
 .../scheduler/capacity/CSQueueUtils.java        |  24 +-
 .../CapacitySchedulerConfiguration.java         | 179 ++++++-
 .../scheduler/capacity/LeafQueue.java           |  31 +-
 .../scheduler/capacity/ParentQueue.java         | 201 +++++++-
 .../scheduler/capacity/UsersManager.java        |   5 +-
 .../PriorityUtilizationQueueOrderingPolicy.java |  11 +
 .../webapp/dao/CapacitySchedulerQueueInfo.java  |  15 +
 .../yarn/server/resourcemanager/MockNM.java     |   8 +
 .../yarn/server/resourcemanager/MockRM.java     |   6 +
 ...alCapacityPreemptionPolicyMockFramework.java |  13 +
 ...estProportionalCapacityPreemptionPolicy.java |  29 +-
 ...pacityPreemptionPolicyIntraQueueWithDRF.java |   6 +-
 .../TestAbsoluteResourceConfiguration.java      | 516 +++++++++++++++++++
 .../capacity/TestApplicationLimits.java         |  30 +-
 .../TestApplicationLimitsByPartition.java       |   4 +
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   2 +
 .../scheduler/capacity/TestLeafQueue.java       | 261 ++++------
 .../scheduler/capacity/TestParentQueue.java     |   8 +
 .../scheduler/capacity/TestReservations.java    |  19 +
 ...tPriorityUtilizationQueueOrderingPolicy.java |   3 +
 .../webapp/TestRMWebServicesCapacitySched.java  |   4 +-
 35 files changed, 1831 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index cda5ec7..1be8a08 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -1152,4 +1152,35 @@ public class StringUtils {
     return s1.equalsIgnoreCase(s2);
   }
 
+  /**
+   * <p>Checks if the String contains only unicode letters.</p>
+   *
+   * <p><code>null</code> will return <code>false</code>.
+   * An empty String (length()=0) will return <code>true</code>.</p>
+   *
+   * <pre>
+   * StringUtils.isAlpha(null)   = false
+   * StringUtils.isAlpha("")     = true
+   * StringUtils.isAlpha("  ")   = false
+   * StringUtils.isAlpha("abc")  = true
+   * StringUtils.isAlpha("ab2c") = false
+   * StringUtils.isAlpha("ab-c") = false
+   * </pre>
+   *
+   * @param str  the String to check, may be null
+   * @return <code>true</code> if only contains letters, and is non-null
+   */
+  public static boolean isAlpha(String str) {
+      if (str == null) {
+          return false;
+      }
+      int sz = str.length();
+      for (int i = 0; i < sz; i++) {
+          if (Character.isLetter(str.charAt(i)) == false) {
+              return false;
+          }
+      }
+      return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 7f155e7..aefa85c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -131,4 +131,10 @@ public class DefaultResourceCalculator extends ResourceCalculator {
   public boolean isAnyMajorResourceZero(Resource resource) {
     return resource.getMemorySize() == 0f;
   }
+
+  @Override
+  public Resource normalizeDown(Resource r, Resource stepFactor) {
+    return Resources.createResource(
+        roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 6b284e3..16e4527 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -567,4 +567,11 @@ public class DominantResourceCalculator extends ResourceCalculator {
     }
     return false;
   }
+
+  @Override
+  public Resource normalizeDown(Resource r, Resource stepFactor) {
+    return Resources.createResource(
+        roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
+        roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index d59560f..a816290 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -235,4 +235,16 @@ public abstract class ResourceCalculator {
    * @return returns true if any resource is zero.
    */
   public abstract boolean isAnyMajorResourceZero(Resource resource);
+
+  /**
+   * Get resource <code>r</code>and normalize down using step-factor
+   * <code>stepFactor</code>.
+   *
+   * @param r
+   *          resource to be multiplied
+   * @param stepFactor
+   *          factor by which to normalize down
+   * @return resulting normalized resource
+   */
+  public abstract Resource normalizeDown(Resource r, Resource stepFactor);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 068e7f1..0426ab7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -546,4 +546,9 @@ public class Resources {
       Resource resource) {
     return rc.isAnyMajorResourceZero(resource);
   }
+
+  public static Resource normalizeDown(ResourceCalculator calculator,
+      Resource resource, Resource factor) {
+    return calculator.normalizeDown(resource, factor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index f843db4..748548a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -140,10 +140,10 @@ public class FifoCandidatesSelector
         // Can try preempting AMContainers (still saving atmost
         // maxAMCapacityForThisQueue AMResource's) if more resources are
         // required to be preemptionCandidates from this Queue.
-        Resource maxAMCapacityForThisQueue = Resources.multiply(
-            Resources.multiply(clusterResource,
-                leafQueue.getAbsoluteCapacity()),
-            leafQueue.getMaxAMResourcePerQueuePercent());
+        Resource maxAMCapacityForThisQueue = Resources
+            .multiply(
+                leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
+                leafQueue.getMaxAMResourcePerQueuePercent());
 
         preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
             resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
@@ -199,7 +199,6 @@ public class FifoCandidatesSelector
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
    */
-  @SuppressWarnings("unchecked")
   private void preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Map<String, Resource> resToObtainByPartition,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 2c072d2..8327cb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolic
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -525,6 +526,13 @@ public class ProportionalCapacityPreemptionPolicy
       float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
       boolean preemptionDisabled = curQueue.getPreemptionDisabled();
 
+      QueueResourceQuotas queueResourceQuotas = curQueue
+          .getQueueResourceQuotas();
+      Resource effMinRes = queueResourceQuotas
+          .getEffectiveMinResource(partitionToLookAt);
+      Resource effMaxRes = queueResourceQuotas
+          .getEffectiveMaxResource(partitionToLookAt);
+
       Resource current = Resources
           .clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
       Resource killable = Resources.none();
@@ -550,7 +558,7 @@ public class ProportionalCapacityPreemptionPolicy
 
       ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
           partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
-          reserved, curQueue);
+          reserved, curQueue, effMinRes, effMaxRes);
 
       if (curQueue instanceof ParentQueue) {
         String configuredOrderingPolicy =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 89452f9..bd236fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -48,6 +48,9 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
 
   double normalizedGuarantee;
 
+  private Resource effMinRes;
+  private Resource effMaxRes;
+
   final ArrayList<TempQueuePerPartition> children;
   private Collection<TempAppPerPartition> apps;
   LeafQueue leafQueue;
@@ -68,7 +71,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
-      Resource reserved, CSQueue queue) {
+      Resource reserved, CSQueue queue, Resource effMinRes,
+      Resource effMaxRes) {
     super(queueName, current, Resource.newInstance(0, 0), reserved,
         Resource.newInstance(0, 0));
 
@@ -95,6 +99,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     this.absCapacity = absCapacity;
     this.absMaxCapacity = absMaxCapacity;
     this.totalPartitionResource = totalPartitionResource;
+    this.effMinRes = effMinRes;
+    this.effMaxRes = effMaxRes;
   }
 
   public void setLeafQueue(LeafQueue l) {
@@ -177,10 +183,18 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   }
 
   public Resource getGuaranteed() {
+    if(!effMinRes.equals(Resources.none())) {
+      return Resources.clone(effMinRes);
+    }
+
     return Resources.multiply(totalPartitionResource, absCapacity);
   }
 
   public Resource getMax() {
+    if(!effMaxRes.equals(Resources.none())) {
+      return Resources.clone(effMaxRes);
+    }
+
     return Resources.multiply(totalPartitionResource, absMaxCapacity);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
new file mode 100644
index 0000000..c295323
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class AbstractResourceUsage {
+  protected ReadLock readLock;
+  protected WriteLock writeLock;
+  protected Map<String, UsageByLabel> usages;
+  // short for no-label :)
+  private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+  public AbstractResourceUsage() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+
+    usages = new HashMap<String, UsageByLabel>();
+    usages.put(NL, new UsageByLabel(NL));
+  }
+
+  // Usage enum here to make implement cleaner
+  public enum ResourceType {
+    // CACHED_USED and CACHED_PENDING may be read by anyone, but must only
+    // be written by ordering policies
+    USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
+        5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
+            9), EFF_MAX_RESOURCE(
+                10), EFF_MIN_RESOURCE_UP(11), EFF_MAX_RESOURCE_UP(12);
+
+    private int idx;
+
+    private ResourceType(int value) {
+      this.idx = value;
+    }
+  }
+
+  public static class UsageByLabel {
+    // usage by label, contains all UsageType
+    private Resource[] resArr;
+
+    public UsageByLabel(String label) {
+      resArr = new Resource[ResourceType.values().length];
+      for (int i = 0; i < resArr.length; i++) {
+        resArr[i] = Resource.newInstance(0, 0);
+      };
+    }
+
+    public Resource getUsed() {
+      return resArr[ResourceType.USED.idx];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{used=" + resArr[0] + "%, ");
+      sb.append("pending=" + resArr[1] + "%, ");
+      sb.append("am_used=" + resArr[2] + "%, ");
+      sb.append("reserved=" + resArr[3] + "%}");
+      sb.append("min_eff=" + resArr[9] + "%, ");
+      sb.append("max_eff=" + resArr[10] + "%}");
+      sb.append("min_effup=" + resArr[11] + "%, ");
+      return sb.toString();
+    }
+  }
+
+  private static Resource normalize(Resource res) {
+    if (res == null) {
+      return Resources.none();
+    }
+    return res;
+  }
+
+  protected Resource _get(String label, ResourceType type) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    try {
+      readLock.lock();
+      UsageByLabel usage = usages.get(label);
+      if (null == usage) {
+        return Resources.none();
+      }
+      return normalize(usage.resArr[type.idx]);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected Resource _getAll(ResourceType type) {
+    try {
+      readLock.lock();
+      Resource allOfType = Resources.createResource(0);
+      for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
+        //all usages types are initialized
+        Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
+      }
+      return allOfType;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private UsageByLabel getAndAddIfMissing(String label) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+    if (!usages.containsKey(label)) {
+      UsageByLabel u = new UsageByLabel(label);
+      usages.put(label, u);
+      return u;
+    }
+
+    return usages.get(label);
+  }
+
+  protected void _set(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      usage.resArr[type.idx] = res;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void _inc(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      Resources.addTo(usage.resArr[type.idx], res);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void _dec(String label, ResourceType type, Resource res) {
+    try {
+      writeLock.lock();
+      UsageByLabel usage = getAndAddIfMissing(label);
+      Resources.subtractFrom(usage.resArr[type.idx], res);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    try {
+      readLock.lock();
+      return usages.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public Set<String> getNodePartitionsSet() {
+    try {
+      readLock.lock();
+      return usages.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
new file mode 100644
index 0000000..2e653fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueResourceQuotas.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+/**
+ * QueueResourceQuotas by Labels for following fields by label
+ * - EFFECTIVE_MIN_CAPACITY
+ * - EFFECTIVE_MAX_CAPACITY
+ * This class can be used to track resource usage in queue/user/app.
+ *
+ * And it is thread-safe
+ */
+public class QueueResourceQuotas extends AbstractResourceUsage {
+  // short for no-label :)
+  private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+
+  public QueueResourceQuotas() {
+    super();
+  }
+
+  /*
+   * Configured Minimum Resource
+   */
+  public Resource getConfiguredMinResource() {
+    return _get(NL, ResourceType.MIN_RESOURCE);
+  }
+
+  public Resource getConfiguredMinResource(String label) {
+    return _get(label, ResourceType.MIN_RESOURCE);
+  }
+
+  public void setConfiguredMinResource(String label, Resource res) {
+    _set(label, ResourceType.MIN_RESOURCE, res);
+  }
+
+  public void setConfiguredMinResource(Resource res) {
+    _set(NL, ResourceType.MIN_RESOURCE, res);
+  }
+
+  /*
+   * Configured Maximum Resource
+   */
+  public Resource getConfiguredMaxResource() {
+    return getConfiguredMaxResource(NL);
+  }
+
+  public Resource getConfiguredMaxResource(String label) {
+    return _get(label, ResourceType.MAX_RESOURCE);
+  }
+
+  public void setConfiguredMaxResource(Resource res) {
+    setConfiguredMaxResource(NL, res);
+  }
+
+  public void setConfiguredMaxResource(String label, Resource res) {
+    _set(label, ResourceType.MAX_RESOURCE, res);
+  }
+
+  /*
+   * Effective Minimum Resource
+   */
+  public Resource getEffectiveMinResource() {
+    return _get(NL, ResourceType.EFF_MIN_RESOURCE);
+  }
+
+  public Resource getEffectiveMinResource(String label) {
+    return _get(label, ResourceType.EFF_MIN_RESOURCE);
+  }
+
+  public void setEffectiveMinResource(String label, Resource res) {
+    _set(label, ResourceType.EFF_MIN_RESOURCE, res);
+  }
+
+  public void setEffectiveMinResource(Resource res) {
+    _set(NL, ResourceType.EFF_MIN_RESOURCE, res);
+  }
+
+  /*
+   * Effective Maximum Resource
+   */
+  public Resource getEffectiveMaxResource() {
+    return getEffectiveMaxResource(NL);
+  }
+
+  public Resource getEffectiveMaxResource(String label) {
+    return _get(label, ResourceType.EFF_MAX_RESOURCE);
+  }
+
+  public void setEffectiveMaxResource(Resource res) {
+    setEffectiveMaxResource(NL, res);
+  }
+
+  public void setEffectiveMaxResource(String label, Resource res) {
+    _set(label, ResourceType.EFF_MAX_RESOURCE, res);
+  }
+
+  /*
+   * Effective Minimum Resource
+   */
+  public Resource getEffectiveMinResourceUp() {
+    return _get(NL, ResourceType.EFF_MIN_RESOURCE_UP);
+  }
+
+  public Resource getEffectiveMinResourceUp(String label) {
+    return _get(label, ResourceType.EFF_MIN_RESOURCE_UP);
+  }
+
+  public void setEffectiveMinResourceUp(String label, Resource res) {
+    _set(label, ResourceType.EFF_MIN_RESOURCE_UP, res);
+  }
+
+  public void setEffectiveMinResourceUp(Resource res) {
+    _set(NL, ResourceType.EFF_MIN_RESOURCE_UP, res);
+  }
+
+  /*
+   * Effective Maximum Resource
+   */
+  public Resource getEffectiveMaxResourceUp() {
+    return getEffectiveMaxResourceUp(NL);
+  }
+
+  public Resource getEffectiveMaxResourceUp(String label) {
+    return _get(label, ResourceType.EFF_MAX_RESOURCE_UP);
+  }
+
+  public void setEffectiveMaxResourceUp(Resource res) {
+    setEffectiveMaxResourceUp(NL, res);
+  }
+
+  public void setEffectiveMaxResourceUp(String label, Resource res) {
+    _set(label, ResourceType.EFF_MAX_RESOURCE_UP, res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 6f0c7d2..ede4aec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -39,63 +39,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * 
  * And it is thread-safe
  */
-public class ResourceUsage {
-  private ReadLock readLock;
-  private WriteLock writeLock;
-  private Map<String, UsageByLabel> usages;
+public class ResourceUsage extends AbstractResourceUsage {
   // short for no-label :)
   private static final String NL = CommonNodeLabelsManager.NO_LABEL;
-  private final UsageByLabel usageNoLabel;
 
   public ResourceUsage() {
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    readLock = lock.readLock();
-    writeLock = lock.writeLock();
-
-    usages = new HashMap<String, UsageByLabel>();
-    usageNoLabel = new UsageByLabel(NL);
-    usages.put(NL, usageNoLabel);
-  }
-
-  // Usage enum here to make implement cleaner
-  private enum ResourceType {
-    //CACHED_USED and CACHED_PENDING may be read by anyone, but must only
-    //be written by ordering policies
-    USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
-      CACHED_PENDING(5), AMLIMIT(6);
-
-    private int idx;
-
-    private ResourceType(int value) {
-      this.idx = value;
-    }
-  }
-
-  private static class UsageByLabel {
-    // usage by label, contains all UsageType
-    private Resource[] resArr;
-
-    public UsageByLabel(String label) {
-      resArr = new Resource[ResourceType.values().length];
-      for (int i = 0; i < resArr.length; i++) {
-        resArr[i] = Resource.newInstance(0, 0);
-      };
-    }
-    
-    public Resource getUsed() {
-      return resArr[ResourceType.USED.idx];
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("{used=" + resArr[0] + "%, ");
-      sb.append("pending=" + resArr[1] + "%, ");
-      sb.append("am_used=" + resArr[2] + "%, ");
-      sb.append("reserved=" + resArr[3] + "%}");
-      sb.append("am_limit=" + resArr[6] + "%, ");
-      return sb.toString();
-    }
+    super();
   }
 
   /*
@@ -109,22 +58,6 @@ public class ResourceUsage {
     return _get(label, ResourceType.USED);
   }
 
-  public Resource getCachedUsed() {
-    return _get(NL, ResourceType.CACHED_USED);
-  }
-
-  public Resource getCachedUsed(String label) {
-    return _get(label, ResourceType.CACHED_USED);
-  }
-
-  public Resource getCachedPending() {
-    return _get(NL, ResourceType.CACHED_PENDING);
-  }
-
-  public Resource getCachedPending(String label) {
-    return _get(label, ResourceType.CACHED_PENDING);
-  }
-
   public void incUsed(String label, Resource res) {
     _inc(label, ResourceType.USED, res);
   }
@@ -145,7 +78,7 @@ public class ResourceUsage {
     setUsed(NL, res);
   }
   
-  public void copyAllUsed(ResourceUsage other) {
+  public void copyAllUsed(AbstractResourceUsage other) {
     try {
       writeLock.lock();
       for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
@@ -160,22 +93,6 @@ public class ResourceUsage {
     _set(label, ResourceType.USED, res);
   }
 
-  public void setCachedUsed(String label, Resource res) {
-    _set(label, ResourceType.CACHED_USED, res);
-  }
-
-  public void setCachedUsed(Resource res) {
-    _set(NL, ResourceType.CACHED_USED, res);
-  }
-
-  public void setCachedPending(String label, Resource res) {
-    _set(label, ResourceType.CACHED_PENDING, res);
-  }
-
-  public void setCachedPending(Resource res) {
-    _set(NL, ResourceType.CACHED_PENDING, res);
-  }
-
   /*
    * Pending
    */
@@ -281,6 +198,47 @@ public class ResourceUsage {
     _set(label, ResourceType.AMUSED, res);
   }
 
+  public Resource getAllPending() {
+    return _getAll(ResourceType.PENDING);
+  }
+
+  public Resource getAllUsed() {
+    return _getAll(ResourceType.USED);
+  }
+
+  // Cache Used
+  public Resource getCachedUsed() {
+    return _get(NL, ResourceType.CACHED_USED);
+  }
+
+  public Resource getCachedUsed(String label) {
+    return _get(label, ResourceType.CACHED_USED);
+  }
+
+  public Resource getCachedPending() {
+    return _get(NL, ResourceType.CACHED_PENDING);
+  }
+
+  public Resource getCachedPending(String label) {
+    return _get(label, ResourceType.CACHED_PENDING);
+  }
+
+  public void setCachedUsed(String label, Resource res) {
+    _set(label, ResourceType.CACHED_USED, res);
+  }
+
+  public void setCachedUsed(Resource res) {
+    _set(NL, ResourceType.CACHED_USED, res);
+  }
+
+  public void setCachedPending(String label, Resource res) {
+    _set(label, ResourceType.CACHED_PENDING, res);
+  }
+
+  public void setCachedPending(Resource res) {
+    _set(NL, ResourceType.CACHED_PENDING, res);
+  }
+
   /*
    * AM-Resource Limit
    */
@@ -316,94 +274,6 @@ public class ResourceUsage {
     _set(label, ResourceType.AMLIMIT, res);
   }
 
-  private static Resource normalize(Resource res) {
-    if (res == null) {
-      return Resources.none();
-    }
-    return res;
-  }
-
-  private Resource _get(String label, ResourceType type) {
-    if (label == null || label.equals(NL)) {
-      return normalize(usageNoLabel.resArr[type.idx]);
-    }
-    try {
-      readLock.lock();
-      UsageByLabel usage = usages.get(label);
-      if (null == usage) {
-        return Resources.none();
-      }
-      return normalize(usage.resArr[type.idx]);
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  private Resource _getAll(ResourceType type) {
-    try {
-      readLock.lock();
-      Resource allOfType = Resources.createResource(0);
-      for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
-        //all usages types are initialized
-        Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
-      }
-      return allOfType;
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  public Resource getAllPending() {
-    return _getAll(ResourceType.PENDING);
-  }
-  
-  public Resource getAllUsed() {
-    return _getAll(ResourceType.USED);
-  }
-
-  private UsageByLabel getAndAddIfMissing(String label) {
-    if (label == null || label.equals(NL)) {
-      return usageNoLabel;
-    }
-    if (!usages.containsKey(label)) {
-      UsageByLabel u = new UsageByLabel(label);
-      usages.put(label, u);
-      return u;
-    }
-
-    return usages.get(label);
-  }
-
-  private void _set(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      usage.resArr[type.idx] = res;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void _inc(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      Resources.addTo(usage.resArr[type.idx], res);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void _dec(String label, ResourceType type, Resource res) {
-    try {
-      writeLock.lock();
-      UsageByLabel usage = getAndAddIfMissing(label);
-      Resources.subtractFrom(usage.resArr[type.idx], res);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   public Resource getCachedDemand(String label) {
     try {
       readLock.lock();
@@ -415,23 +285,4 @@ public class ResourceUsage {
       readLock.unlock();
     }
   }
-  
-  @Override
-  public String toString() {
-    try {
-      readLock.lock();
-      return usages.toString();
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  public Set<String> getNodePartitionsSet() {
-    try {
-      readLock.lock();
-      return usages.keySet();
-    } finally {
-      readLock.unlock();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 74c85ce..dddac4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -87,6 +89,7 @@ public abstract class AbstractCSQueue implements CSQueue {
 
   final ResourceCalculator resourceCalculator;
   Set<String> accessibleLabels;
+  Set<String> resourceTypes;
   final RMNodeLabelsManager labelManager;
   String defaultLabelExpression;
   
@@ -102,6 +105,14 @@ public abstract class AbstractCSQueue implements CSQueue {
   // etc.
   QueueCapacities queueCapacities;
 
+  QueueResourceQuotas queueResourceQuotas;
+
+  protected enum CapacityConfigType {
+    NONE, PERCENTAGE, ABSOLUTE_RESOURCE
+  };
+  protected CapacityConfigType capacityConfigType =
+      CapacityConfigType.NONE;
+
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
   protected CapacitySchedulerContext csContext;
@@ -141,6 +152,9 @@ public abstract class AbstractCSQueue implements CSQueue {
     // initialize QueueCapacities
     queueCapacities = new QueueCapacities(parent == null);
 
+    // initialize queueResourceQuotas
+    queueResourceQuotas = new QueueResourceQuotas();
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -276,6 +290,10 @@ public abstract class AbstractCSQueue implements CSQueue {
       this.defaultLabelExpression =
           csContext.getConfiguration().getDefaultNodeLabelExpression(
               getQueuePath());
+      this.resourceTypes = new HashSet<String>();
+      for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
+        resourceTypes.add(type.toString().toLowerCase());
+      }
 
       // inherit from parent if labels not set
       if (this.accessibleLabels == null && parent != null) {
@@ -292,6 +310,11 @@ public abstract class AbstractCSQueue implements CSQueue {
       // After we setup labels, we can setup capacities
       setupConfigurableCapacities();
 
+      // Also fetch minimum/maximum resource constraint for this queue if
+      // configured.
+      capacityConfigType = CapacityConfigType.NONE;
+      updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
+
       this.maximumAllocation =
           csContext.getConfiguration().getMaximumAllocationPerQueue(
               getQueuePath());
@@ -364,6 +387,125 @@ public abstract class AbstractCSQueue implements CSQueue {
     return unionInheritedWeights;
   }
 
+  protected void updateConfigurableResourceRequirement(String queuePath,
+      Resource clusterResource) {
+    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+    Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
+
+    for (String label : configuredNodelabels) {
+      Resource minResource = conf.getMinimumResourceRequirement(label,
+          queuePath, resourceTypes);
+      Resource maxResource = conf.getMaximumResourceRequirement(label,
+          queuePath, resourceTypes);
+
+      if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
+        this.capacityConfigType = (!minResource.equals(Resources.none())
+            && queueCapacities.getAbsoluteCapacity(label) == 0f)
+                ? CapacityConfigType.ABSOLUTE_RESOURCE
+                : CapacityConfigType.PERCENTAGE;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("capacityConfigType is updated as '" + capacityConfigType
+              + "' for queue '" + getQueueName());
+        }
+      }
+
+      validateAbsoluteVsPercentageCapacityConfig(minResource);
+
+      // If min resource for a resource type is greater than its max resource,
+      // throw exception to handle such error configs.
+      if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
+          resourceCalculator, clusterResource, minResource, maxResource)) {
+        throw new IllegalArgumentException("Min resource configuration "
+            + minResource + " is greater than its max value:" + maxResource
+            + " in queue:" + getQueueName());
+      }
+
+      // If parent's max resource is lesser to a specific child's max
+      // resource, throw exception to handle such error configs.
+      if (parent != null) {
+        Resource parentMaxRes = parent.getQueueResourceQuotas()
+            .getConfiguredMaxResource(label);
+        if (Resources.greaterThan(resourceCalculator, clusterResource,
+            parentMaxRes, Resources.none())) {
+          if (Resources.greaterThan(resourceCalculator, clusterResource,
+              maxResource, parentMaxRes)) {
+            throw new IllegalArgumentException("Max resource configuration "
+                + maxResource + " is greater than parents max value:"
+                + parentMaxRes + " in queue:" + getQueueName());
+          }
+
+          // If child's max resource is not set, but its parent max resource is
+          // set, we must set child max resource to its parent's.
+          if (maxResource.equals(Resources.none())
+              && !minResource.equals(Resources.none())) {
+            maxResource = Resources.clone(parentMaxRes);
+          }
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating absolute resource configuration for queue:"
+            + getQueueName() + " as minResource=" + minResource
+            + " and maxResource=" + maxResource);
+      }
+
+      queueResourceQuotas.setConfiguredMinResource(label, minResource);
+      queueResourceQuotas.setConfiguredMaxResource(label, maxResource);
+    }
+  }
+
+  private void validateAbsoluteVsPercentageCapacityConfig(
+      Resource minResource) {
+    CapacityConfigType localType = CapacityConfigType.PERCENTAGE;
+    if (!minResource.equals(Resources.none())) {
+      localType = CapacityConfigType.ABSOLUTE_RESOURCE;
+    }
+
+    if (!queueName.equals("root")
+        && !this.capacityConfigType.equals(localType)) {
+      throw new IllegalArgumentException("Queue '" + getQueueName()
+          + "' should use either percentage based capacity"
+          + " configuration or absolute resource.");
+    }
+  }
+
+  @Override
+  public CapacityConfigType getCapacityConfigType() {
+    return capacityConfigType;
+  }
+
+  @Override
+  public Resource getEffectiveCapacity(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMinResource(label));
+  }
+
+  @Override
+  public Resource getEffectiveCapacityUp(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMinResourceUp(label));
+  }
+
+  @Override
+  public Resource getEffectiveCapacityDown(String label, Resource factor) {
+    return Resources.normalizeDown(resourceCalculator,
+        getQueueResourceQuotas().getEffectiveMinResource(label),
+        minimumAllocation);
+  }
+
+  @Override
+  public Resource getEffectiveMaxCapacity(String label) {
+    return Resources
+        .clone(getQueueResourceQuotas().getEffectiveMaxResource(label));
+  }
+
+  @Override
+  public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
+    return Resources.normalizeDown(resourceCalculator,
+        getQueueResourceQuotas().getEffectiveMaxResource(label),
+        minimumAllocation);
+  }
+
   private void initializeQueueState(QueueState previousState,
       QueueState configuredState, QueueState parentState) {
     // verify that we can not any value for State other than RUNNING/STOPPED
@@ -555,6 +697,11 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
 
   @Override
+  public QueueResourceQuotas getQueueResourceQuotas() {
+    return queueResourceQuotas;
+  }
+
+  @Override
   public ReentrantReadWriteLock.ReadLock getReadLock() {
     return readLock;
   }
@@ -604,7 +751,7 @@ public abstract class AbstractCSQueue implements CSQueue {
        * limit-set-by-parent)
        */
       Resource queueMaxResource =
-          getQueueMaxResource(nodePartition, clusterResource);
+          getQueueMaxResource(nodePartition);
 
       return Resources.min(resourceCalculator, clusterResource,
           queueMaxResource, currentResourceLimits.getLimit());
@@ -617,11 +764,8 @@ public abstract class AbstractCSQueue implements CSQueue {
     return Resources.none();
   }
 
-  Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
-    return Resources.multiplyAndNormalizeDown(resourceCalculator,
-        labelManager.getResourceByLabel(nodePartition, clusterResource),
-        queueCapacities.getAbsoluteMaximumCapacity(nodePartition),
-        minimumAllocation);
+  Resource getQueueMaxResource(String nodePartition) {
+    return getEffectiveMaxCapacity(nodePartition);
   }
 
   public boolean hasChildQueues() {
@@ -782,7 +926,7 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueUsage.incUsed(nodeLabel, resourceToInc);
     CSQueueUtils.updateUsedCapacity(resourceCalculator,
         labelManager.getResourceByLabel(nodeLabel, Resources.none()),
-        nodeLabel, this);
+        Resources.none(), nodeLabel, this);
     if (null != parent) {
       parent.incUsedResource(nodeLabel, resourceToInc, null);
     }
@@ -798,7 +942,7 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueUsage.decUsed(nodeLabel, resourceToDec);
     CSQueueUtils.updateUsedCapacity(resourceCalculator,
         labelManager.getResourceByLabel(nodeLabel, Resources.none()),
-        nodeLabel, this);
+        Resources.none(), nodeLabel, this);
     if (null != parent) {
       parent.decUsedResource(nodeLabel, resourceToDec, null);
     }
@@ -904,7 +1048,7 @@ public abstract class AbstractCSQueue implements CSQueue {
         Resource maxResourceLimit;
         if (allocation.getSchedulingMode()
             == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-          maxResourceLimit = getQueueMaxResource(partition, cluster);
+          maxResourceLimit = getQueueMaxResource(partition);
         } else{
           maxResourceLimit = labelManager.getResourceByLabel(
               schedulerContainer.getNodePartition(), cluster);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 43e7f53..2e29a71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -41,10 +41,12 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -355,4 +357,41 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return map of usernames and corresponding weight
    */
   Map<String, Float> getUserWeights();
+
+  /**
+   * Get QueueResourceQuotas associated with each queue.
+   * @return QueueResourceQuotas
+   */
+  public QueueResourceQuotas getQueueResourceQuotas();
+
+  /**
+   * Get CapacityConfigType as PERCENTAGE or ABSOLUTE_RESOURCE
+   * @return CapacityConfigType
+   */
+  public CapacityConfigType getCapacityConfigType();
+
+  /**
+   * Get effective capacity of queue. If min/max resource is configured,
+   * preference will be given to absolute configuration over normal capacity.
+   * Also round down the result to normalizeDown.
+   *
+   * @param label
+   *          partition
+   * @return effective queue capacity
+   */
+  Resource getEffectiveCapacity(String label);
+  Resource getEffectiveCapacityUp(String label);
+  Resource getEffectiveCapacityDown(String label, Resource factor);
+
+  /**
+   * Get effective max capacity of queue. If min/max resource is configured,
+   * preference will be given to absolute configuration over normal capacity.
+   * Also round down the result to normalizeDown.
+   *
+   * @param label
+   *          partition
+   * @return effective max queue capacity
+   */
+  Resource getEffectiveMaxCapacity(String label);
+  Resource getEffectiveMaxCapacityDown(String label, Resource factor);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index e1014c1..81dec80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -150,7 +150,7 @@ class CSQueueUtils {
       }
     }
   }
-  
+
   // Set absolute capacities for {capacity, maximum-capacity}
   private static void updateAbsoluteCapacitiesByNodeLabels(
       QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
@@ -180,8 +180,8 @@ class CSQueueUtils {
    * used resource for all partitions of this queue.
    */
   public static void updateUsedCapacity(final ResourceCalculator rc,
-      final Resource totalPartitionResource, String nodePartition,
-      AbstractCSQueue childQueue) {
+      final Resource totalPartitionResource, Resource clusterResource,
+      String nodePartition, AbstractCSQueue childQueue) {
     QueueCapacities queueCapacities = childQueue.getQueueCapacities();
     CSQueueMetrics queueMetrics = childQueue.getMetrics();
     ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
@@ -193,11 +193,8 @@ class CSQueueUtils {
 
     if (Resources.greaterThan(rc, totalPartitionResource,
         totalPartitionResource, Resources.none())) {
-      // queueGuaranteed = totalPartitionedResource *
-      // absolute_capacity(partition)
-      Resource queueGuranteedResource =
-          Resources.multiply(totalPartitionResource,
-              queueCapacities.getAbsoluteCapacity(nodePartition));
+      Resource queueGuranteedResource = childQueue
+          .getEffectiveCapacity(nodePartition);
 
       // make queueGuranteed >= minimum_allocation to avoid divided by 0.
       queueGuranteedResource =
@@ -248,9 +245,7 @@ class CSQueueUtils {
     for (String partition : nodeLabels) {
       // Calculate guaranteed resource for a label in a queue by below logic.
       // (total label resource) * (absolute capacity of label in that queue)
-      Resource queueGuranteedResource = Resources.multiply(nlm
-          .getResourceByLabel(partition, cluster), queue.getQueueCapacities()
-          .getAbsoluteCapacity(partition));
+      Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
 
       // Available resource in queue for a specific label will be calculated as
       // {(guaranteed resource for a label in a queue) -
@@ -289,15 +284,14 @@ class CSQueueUtils {
     ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
 
     if (nodePartition == null) {
-      for (String partition : Sets.union(
-          queueCapacities.getNodePartitionsSet(),
+      for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
           queueResourceUsage.getNodePartitionsSet())) {
         updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
-            partition, childQueue);
+            cluster, partition, childQueue);
       }
     } else {
       updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
-          nodePartition, childQueue);
+          cluster, nodePartition, childQueue);
     }
 
     // Update queue metrics w.r.t node labels. In a generic way, we can

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 4515453..f81e097 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -60,6 +61,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.Set;
 import java.util.StringTokenizer;
 
@@ -317,6 +320,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
 
+  /** Configuring absolute min/max resources in a queue **/
+  @Private
+  public static final String MINIMUM_RESOURCE = "min-resource";
+
+  @Private
+  public static final String MAXIMUM_RESOURCE = "max-resource";
+
+  public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
+
+  public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
+
+  public enum AbsoluteResourceType {
+    MEMORY, VCORES;
+  }
+
   AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
 
   public CapacitySchedulerConfiguration() {
@@ -394,7 +412,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   
   public float getNonLabeledQueueCapacity(String queue) {
     float capacity = queue.equals("root") ? 100.0f : getFloat(
-        getQueuePrefix(queue) + CAPACITY, UNDEFINED);
+        getQueuePrefix(queue) + CAPACITY, 0f);
     if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
       throw new IllegalArgumentException("Illegal " +
       		"capacity of " + capacity + " for queue " + queue);
@@ -1676,4 +1694,163 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         queuePath);
     setMaximumCapacity(leafQueueConfPrefix, val);
   }
+
+  public static String getUnits(String resourceValue) {
+    String units;
+    for (int i = 0; i < resourceValue.length(); i++) {
+      if (Character.isAlphabetic(resourceValue.charAt(i))) {
+        units = resourceValue.substring(i);
+        if (StringUtils.isAlpha(units)) {
+          return units;
+        }
+      }
+    }
+    return "";
+  }
+
+  /**
+   * Get absolute minimum resource requirement for a queue.
+   *
+   * @param label
+   *          NodeLabel
+   * @param queue
+   *          queue path
+   * @param resourceTypes
+   *          Resource types
+   * @return ResourceInformation
+   */
+  public Resource getMinimumResourceRequirement(String label, String queue,
+      Set<String> resourceTypes) {
+    return internalGetLabeledResourceRequirementForQueue(queue, label,
+        resourceTypes, MINIMUM_RESOURCE);
+  }
+
+  /**
+   * Get absolute maximum resource requirement for a queue.
+   *
+   * @param label
+   *          NodeLabel
+   * @param queue
+   *          queue path
+   * @param resourceTypes
+   *          Resource types
+   * @return Resource
+   */
+  public Resource getMaximumResourceRequirement(String label, String queue,
+      Set<String> resourceTypes) {
+    return internalGetLabeledResourceRequirementForQueue(queue, label,
+        resourceTypes, MAXIMUM_RESOURCE);
+  }
+
+  @VisibleForTesting
+  public void setMinimumResourceRequirement(String label, String queue,
+      Resource resource) {
+    updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
+  }
+
+  @VisibleForTesting
+  public void setMaximumResourceRequirement(String label, String queue,
+      Resource resource) {
+    updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
+  }
+
+  private void updateMinMaxResourceToConf(String label, String queue,
+      Resource resource, String type) {
+    if (queue.equals("root")) {
+      throw new IllegalArgumentException(
+          "Cannot set resource, root queue will take 100% of cluster capacity");
+    }
+
+    StringBuilder resourceString = new StringBuilder();
+    resourceString
+        .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+            + resource.getMemorySize() + ","
+            + AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+            + resource.getVirtualCores() + "]");
+
+    String prefix = getQueuePrefix(queue) + type;
+    if (!label.isEmpty()) {
+      prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
+          + DOT + type;
+    }
+    set(prefix, resourceString.toString());
+  }
+
+  private Resource internalGetLabeledResourceRequirementForQueue(String queue,
+      String label, Set<String> resourceTypes, String suffix) {
+    String propertyName = getNodeLabelPrefix(queue, label) + suffix;
+    String resourceString = get(propertyName);
+    if (resourceString == null || resourceString.isEmpty()) {
+      return Resources.none();
+    }
+
+    // Define resource here.
+    Resource resource = Resource.newInstance(0l, 0);
+    Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
+        .matcher(resourceString);
+    /*
+     * Absolute resource configuration for a queue will be grouped by "[]".
+     * Syntax of absolute resource config could be like below
+     * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
+     */
+    if (matcher.find()) {
+      // Get the sub-group.
+      String subGroup = matcher.group(1);
+      if (subGroup.trim().isEmpty()) {
+        return Resources.none();
+      }
+
+      for (String kvPair : subGroup.trim().split(",")) {
+        String[] splits = kvPair.split("=");
+
+        // Ensure that each sub string is key value pair separated by '='.
+        if (splits != null && splits.length > 1) {
+          updateResourceValuesFromConfig(resourceTypes, resource, splits);
+        }
+      }
+    }
+
+    // Memory has to be configured always.
+    if (resource.getMemorySize() == 0l) {
+      return Resources.none();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
+          + getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
+    }
+    return resource;
+  }
+
+  private void updateResourceValuesFromConfig(Set<String> resourceTypes,
+      Resource resource, String[] splits) {
+
+    // If key is not a valid type, skip it.
+    if (!resourceTypes.contains(splits[0])) {
+      return;
+    }
+
+    String units = getUnits(splits[1]);
+    Long resourceValue = Long
+        .valueOf(splits[1].substring(0, splits[1].length() - units.length()));
+
+    // Convert all incoming units to MB if units is configured.
+    if (!units.isEmpty()) {
+      resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+    }
+
+    // map it based on key.
+    AbsoluteResourceType resType = AbsoluteResourceType
+        .valueOf(StringUtils.toUpperCase(splits[0].trim()));
+    switch (resType) {
+      case MEMORY :
+        resource.setMemorySize(resourceValue);
+        break;
+      case VCORES :
+        resource.setVirtualCores(resourceValue.intValue());
+        break;
+      default :
+        break;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index e8342d9..1460121 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -671,12 +671,7 @@ public class LeafQueue extends AbstractCSQueue {
           1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
       effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
 
-      Resource queuePartitionResource = Resources
-          .multiplyAndNormalizeUp(resourceCalculator,
-              labelManager.getResourceByLabel(nodePartition,
-                  lastClusterResource),
-              queueCapacities.getAbsoluteCapacity(nodePartition),
-              minimumAllocation);
+      Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
 
       Resource userAMLimit = Resources.multiplyAndNormalizeUp(
           resourceCalculator, queuePartitionResource,
@@ -705,11 +700,7 @@ public class LeafQueue extends AbstractCSQueue {
        * non-labeled), * with per-partition am-resource-percent to get the max am
        * resource limit for this queue and partition.
        */
-      Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
-          resourceCalculator,
-          labelManager.getResourceByLabel(nodePartition, lastClusterResource),
-          queueCapacities.getAbsoluteCapacity(nodePartition),
-          minimumAllocation);
+      Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
 
       Resource queueCurrentLimit = Resources.none();
       // For non-labeled partition, we need to consider the current queue
@@ -965,6 +956,14 @@ public class LeafQueue extends AbstractCSQueue {
   private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
     // Set preemption-allowed:
     // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
+    if (!queueResourceQuotas.getEffectiveMinResource(nodePartition)
+        .equals(Resources.none())) {
+      limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
+          csContext.getClusterResource(), queueUsage.getUsed(nodePartition),
+          queueResourceQuotas.getEffectiveMinResource(nodePartition)));
+      return;
+    }
+
     float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
     float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
@@ -1344,7 +1343,7 @@ public class LeafQueue extends AbstractCSQueue {
     currentPartitionResourceLimit =
         partition.equals(RMNodeLabelsManager.NO_LABEL)
             ? currentPartitionResourceLimit
-            : getQueueMaxResource(partition, clusterResource);
+            : getQueueMaxResource(partition);
 
     Resource headroom = Resources.componentwiseMin(
         Resources.subtract(userLimitResource, user.getUsed(partition)),
@@ -1716,12 +1715,8 @@ public class LeafQueue extends AbstractCSQueue {
     // this. So need cap limits by queue's max capacity here.
     this.cachedResourceLimitsForHeadroom =
         new ResourceLimits(currentResourceLimits.getLimit());
-    Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
-            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
-            queueCapacities
-                .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
-            minimumAllocation);
+    Resource queueMaxResource = getEffectiveMaxCapacityDown(
+        RMNodeLabelsManager.NO_LABEL, minimumAllocation);
     this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
         resourceCalculator, clusterResource, queueMaxResource,
         currentResourceLimits.getLimit()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 959ca51..c770fac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -162,31 +164,78 @@ public class ParentQueue extends AbstractCSQueue {
       writeLock.lock();
       // Validate
       float childCapacities = 0;
+      Resource minResDefaultLabel = Resources.createResource(0, 0);
       for (CSQueue queue : childQueues) {
         childCapacities += queue.getCapacity();
+        Resources.addTo(minResDefaultLabel, queue.getQueueResourceQuotas()
+            .getConfiguredMinResource());
+
+        // If any child queue is using percentage based capacity model vs parent
+        // queues' absolute configuration or vice versa, throw back an
+        // exception.
+        if (!queueName.equals("root") && getCapacity() != 0f
+            && !queue.getQueueResourceQuotas().getConfiguredMinResource()
+                .equals(Resources.none())) {
+          throw new IllegalArgumentException("Parent queue '" + getQueueName()
+              + "' and child queue '" + queue.getQueueName()
+              + "' should use either percentage based capacity"
+              + " configuration or absolute resource together.");
+        }
       }
+
       float delta = Math.abs(1.0f - childCapacities);  // crude way to check
       // allow capacities being set to 0, and enforce child 0 if parent is 0
-      if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || (
-          (queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
-        throw new IllegalArgumentException(
-            "Illegal" + " capacity of " + childCapacities
-                + " for children of queue " + queueName);
+      if ((minResDefaultLabel.equals(Resources.none())
+          && (queueCapacities.getCapacity() > 0) && (delta > PRECISION))
+          || ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
+        throw new IllegalArgumentException("Illegal" + " capacity of "
+            + childCapacities + " for children of queue " + queueName);
       }
       // check label capacities
       for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
         float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
         // check children's labels
         float sum = 0;
+        Resource minRes = Resources.createResource(0, 0);
+        Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
+            scheduler.getClusterResource());
         for (CSQueue queue : childQueues) {
           sum += queue.getQueueCapacities().getCapacity(nodeLabel);
+
+          // If any child queue of a label is using percentage based capacity
+          // model vs parent queues' absolute configuration or vice versa, throw
+          // back an exception
+          if (!queueName.equals("root") && !this.capacityConfigType
+              .equals(queue.getCapacityConfigType())) {
+            throw new IllegalArgumentException("Parent queue '" + getQueueName()
+                + "' and child queue '" + queue.getQueueName()
+                + "' should use either percentage based capacity"
+                + "configuration or absolute resource together for label:"
+                + nodeLabel);
+          }
+
+          // Accumulate all min/max resource configured for all child queues.
+          Resources.addTo(minRes, queue.getQueueResourceQuotas()
+              .getConfiguredMinResource(nodeLabel));
         }
-        if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
+        if ((minResDefaultLabel.equals(Resources.none()) && capacityByLabel > 0
+            && Math.abs(1.0f - sum) > PRECISION)
             || (capacityByLabel == 0) && (sum > 0)) {
           throw new IllegalArgumentException(
               "Illegal" + " capacity of " + sum + " for children of queue "
                   + queueName + " for label=" + nodeLabel);
         }
+
+        // Ensure that for each parent queue: parent.min-resource >=
+        // Σ(child.min-resource).
+        Resource parentMinResource = queueResourceQuotas
+            .getConfiguredMinResource(nodeLabel);
+        if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
+            resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
+          throw new IllegalArgumentException("Parent Queues" + " capacity: "
+              + parentMinResource + " is less than" + " to its children:"
+              + minRes + " for queue:" + queueName);
+        }
       }
 
       this.childQueues.clear();
@@ -687,11 +736,8 @@ public class ParentQueue extends AbstractCSQueue {
         child.getQueueResourceUsage().getUsed(nodePartition));
 
     // Get child's max resource
-    Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown(
-        resourceCalculator,
-        labelManager.getResourceByLabel(nodePartition, clusterResource),
-        child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition),
-        minimumAllocation);
+    Resource childConfiguredMaxResource = getEffectiveMaxCapacityDown(
+        nodePartition, minimumAllocation);
 
     // Child's limit should be capped by child configured max resource
     childLimit =
@@ -827,6 +873,14 @@ public class ParentQueue extends AbstractCSQueue {
       ResourceLimits resourceLimits) {
     try {
       writeLock.lock();
+
+      // Update effective capacity in all parent queue.
+      Set<String> configuredNodelabels = csContext.getConfiguration()
+          .getConfiguredNodeLabels(getQueuePath());
+      for (String label : configuredNodelabels) {
+        calculateEffectiveResourcesAndCapacity(label, clusterResource);
+      }
+
       // Update all children
       for (CSQueue childQueue : childQueues) {
         // Get ResourceLimits of child queue before assign containers
@@ -848,6 +902,110 @@ public class ParentQueue extends AbstractCSQueue {
     return true;
   }
 
+  private void calculateEffectiveResourcesAndCapacity(String label,
+      Resource clusterResource) {
+
+    // For root queue, ensure that max/min resource is updated to latest
+    // cluster resource.
+    Resource resourceByLabel = labelManager.getResourceByLabel(label,
+        clusterResource);
+    if (getQueueName().equals("root")) {
+      queueResourceQuotas.setConfiguredMinResource(label, resourceByLabel);
+      queueResourceQuotas.setConfiguredMaxResource(label, resourceByLabel);
+      queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel);
+      queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel);
+    }
+
+    // Total configured min resources of direct children of queue
+    Resource configuredMinResources = Resource.newInstance(0L, 0);
+    for (CSQueue childQueue : getChildQueues()) {
+      Resources.addTo(configuredMinResources,
+          childQueue.getQueueResourceQuotas().getConfiguredMinResource(label));
+    }
+
+    // Factor to scale down effective resource: When cluster has sufficient
+    // resources, effective_min_resources will be same as configured
+    // min_resources.
+    float effectiveMinRatio = 1;
+    ResourceCalculator rc = this.csContext.getResourceCalculator();
+    if (getQueueName().equals("root")) {
+      if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc,
+          clusterResource, resourceByLabel, configuredMinResources)) {
+        effectiveMinRatio = Resources.divide(rc, clusterResource,
+            resourceByLabel, configuredMinResources);
+      }
+    } else {
+      if (Resources.lessThan(rc, clusterResource,
+          queueResourceQuotas.getEffectiveMinResource(label),
+          configuredMinResources)) {
+        effectiveMinRatio = Resources.divide(rc, clusterResource,
+            queueResourceQuotas.getEffectiveMinResource(label),
+            configuredMinResources);
+      }
+    }
+
+    // loop and do this for all child queues
+    for (CSQueue childQueue : getChildQueues()) {
+      Resource minResource = childQueue.getQueueResourceQuotas()
+          .getConfiguredMinResource(label);
+
+      // Update effective resource (min/max) to each child queue.
+      if (childQueue.getCapacityConfigType()
+          .equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
+        childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
+            Resources.multiply(minResource, effectiveMinRatio));
+
+        // Max resource of a queue should be a minimum of {configuredMaxRes,
+        // parentMaxRes}. parentMaxRes could be configured value. But if not
+        // present could also be taken from effective max resource of parent.
+        Resource parentMaxRes = queueResourceQuotas
+            .getConfiguredMaxResource(label);
+        if (parentMaxRes.equals(Resources.none())) {
+          parentMaxRes = parent.getQueueResourceQuotas()
+              .getEffectiveMaxResource(label);
+        }
+
+        // Minimum of {childMaxResource, parentMaxRes}. However if
+        // childMaxResource is empty, consider parent's max resource alone.
+        Resource childMaxResource = childQueue.getQueueResourceQuotas()
+            .getConfiguredMaxResource(label);
+        Resource effMaxResource = Resources.min(resourceCalculator,
+            resourceByLabel, childMaxResource.equals(Resources.none())
+                ? parentMaxRes
+                : childMaxResource,
+            parentMaxRes);
+        childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
+            Resources.clone(effMaxResource));
+      } else {
+        childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
+            Resources.multiply(resourceByLabel,
+                childQueue.getQueueCapacities().getAbsoluteCapacity(label)));
+        childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
+            Resources.multiply(resourceByLabel, childQueue.getQueueCapacities()
+                .getAbsoluteMaximumCapacity(label)));
+
+        childQueue.getQueueResourceQuotas().setEffectiveMinResourceUp(label,
+            Resources.multiplyAndNormalizeUp(rc, resourceByLabel,
+                childQueue.getQueueCapacities().getAbsoluteCapacity(label),
+                minimumAllocation));
+        childQueue.getQueueResourceQuotas().setEffectiveMaxResourceUp(label,
+            Resources.multiplyAndNormalizeUp(rc,
+                resourceByLabel, childQueue.getQueueCapacities()
+                    .getAbsoluteMaximumCapacity(label),
+                    minimumAllocation));
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating effective min resource for queue:"
+            + childQueue.getQueueName() + " as effMinResource="
+            + childQueue.getQueueResourceQuotas().getEffectiveMinResource(label)
+            + "and Updating effective max resource as effMaxResource="
+            + childQueue.getQueueResourceQuotas()
+                .getEffectiveMaxResource(label));
+      }
+    }
+  }
+
   @Override
   public List<CSQueue> getChildQueues() {
     try {
@@ -980,9 +1138,21 @@ public class ParentQueue extends AbstractCSQueue {
        * When this happens, we have to preempt killable container (on same or different
        * nodes) of parent queue to avoid violating parent's max resource.
        */
-      if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
-          < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
-        killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
+      if (!queueResourceQuotas.getEffectiveMaxResource(nodePartition)
+          .equals(Resources.none())) {
+        if (Resources.lessThan(resourceCalculator, clusterResource,
+            queueResourceQuotas.getEffectiveMaxResource(nodePartition),
+            queueUsage.getUsed(nodePartition))) {
+          killContainersToEnforceMaxQueueCapacity(nodePartition,
+              clusterResource);
+        }
+      } else {
+        if (getQueueCapacities()
+            .getAbsoluteMaximumCapacity(nodePartition) < getQueueCapacities()
+                .getAbsoluteUsedCapacity(nodePartition)) {
+          killContainersToEnforceMaxQueueCapacity(nodePartition,
+              clusterResource);
+        }
       }
     } finally {
       writeLock.unlock();
@@ -999,8 +1169,7 @@ public class ParentQueue extends AbstractCSQueue {
 
     Resource partitionResource = labelManager.getResourceByLabel(partition,
         null);
-    Resource maxResource = Resources.multiply(partitionResource,
-        getQueueCapacities().getAbsoluteMaximumCapacity(partition));
+    Resource maxResource = getEffectiveMaxCapacity(partition);
 
     while (Resources.greaterThan(resourceCalculator, partitionResource,
         queueUsage.getUsed(partition), maxResource)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba444cbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 33f30b0..efc20e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -686,10 +686,7 @@ public class UsersManager implements AbstractUsersManager {
      * * If we're running over capacity, then its (usedResources + required)
      * (which extra resources we are allocating)
      */
-    Resource queueCapacity = Resources.multiplyAndNormalizeUp(
-        resourceCalculator, partitionResource,
-        lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
-        lQueue.getMinimumAllocation());
+    Resource queueCapacity = lQueue.getEffectiveCapacityUp(nodePartition);
 
     /*
      * Assume we have required resource equals to minimumAllocation, this can


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message