Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E14FC10C3A for ; Mon, 16 Feb 2015 18:31:12 +0000 (UTC) Received: (qmail 6404 invoked by uid 500); 16 Feb 2015 18:31:09 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 6020 invoked by uid 500); 16 Feb 2015 18:31:09 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 5451 invoked by uid 99); 16 Feb 2015 18:31:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Feb 2015 18:31:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AC6BE07D9; Mon, 16 Feb 2015 18:31:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Mon, 16 Feb 2015 18:31:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] hadoop git commit: YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track capacities-by-label. Contributed by Wangda Tan YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track capacities-by-label. Contributed by Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f14f01be Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f14f01be Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f14f01be Branch: refs/heads/HDFS-7285 Commit: f14f01bebe74fc4a83e878778f6b51d2d9e18d36 Parents: 27447c5 Author: Jian He Authored: Thu Feb 12 14:58:09 2015 -0800 Committer: Zhe Zhang Committed: Mon Feb 16 10:29:47 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 2 + .../scheduler/capacity/AbstractCSQueue.java | 240 ++++++------------- .../scheduler/capacity/CSQueue.java | 49 +--- .../scheduler/capacity/CSQueueUtils.java | 160 +++++++++---- .../CapacitySchedulerConfiguration.java | 73 ++---- .../scheduler/capacity/LeafQueue.java | 196 ++++----------- .../scheduler/capacity/ParentQueue.java | 92 ++----- .../scheduler/capacity/PlanQueue.java | 13 +- .../scheduler/capacity/QueueCapacities.java | 68 +++++- .../scheduler/capacity/ReservationQueue.java | 10 +- .../scheduler/capacity/TestCSQueueUtils.java | 24 +- .../capacity/TestCapacityScheduler.java | 6 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 3 +- .../scheduler/capacity/TestQueueCapacities.java | 2 +- .../scheduler/capacity/TestQueueParsing.java | 14 +- .../webapp/TestRMWebServicesCapacitySched.java | 4 +- 17 files changed, 412 insertions(+), 547 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8bc614d..b91281e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -281,6 +281,9 @@ Release 2.7.0 - UNRELEASED YARN-3181. FairScheduler: Fix up outdated findbugs issues. (kasha) + YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track + capacities-by-label. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-2990. FairScheduler's delay-scheduling always waits for node-local and http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 09a9d2e..70f1a71 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -194,6 +194,8 @@ + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 753fb14..eb7218b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -44,23 +44,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { CSQueue parent; final String queueName; - float capacity; - float maximumCapacity; - float absoluteCapacity; - float absoluteMaxCapacity; - float absoluteUsedCapacity = 0.0f; - - float usedCapacity = 0.0f; volatile int numContainers; - final Resource minimumAllocation; + Resource minimumAllocation; Resource maximumAllocation; QueueState state; final QueueMetrics metrics; @@ -70,10 +62,6 @@ public abstract class AbstractCSQueue implements CSQueue { Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Map absoluteCapacityByNodeLabels; - Map capacitiyByNodeLabels; - Map absoluteMaxCapacityByNodeLabels; - Map maxCapacityByNodeLabels; Map acls = new HashMap(); @@ -83,15 +71,17 @@ public abstract class AbstractCSQueue implements CSQueue { // Track resource usage-by-label like used-resource/pending-resource, etc. ResourceUsage queueUsage; + // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, + // etc. + QueueCapacities queueCapacities; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private CapacitySchedulerContext csContext; + protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { - this.minimumAllocation = cs.getMinimumResourceCapability(); - this.maximumAllocation = cs.getMaximumResourceCapability(); this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; @@ -102,68 +92,55 @@ public abstract class AbstractCSQueue implements CSQueue { QueueMetrics.forQueue(getQueuePath(), parent, cs.getConfiguration().getEnableUserMetrics(), cs.getConf()); - - // get labels - this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath()); - this.defaultLabelExpression = cs.getConfiguration() - .getDefaultNodeLabelExpression(getQueuePath()); - // inherit from parent if labels not set - if (this.accessibleLabels == null && parent != null) { - this.accessibleLabels = parent.getAccessibleNodeLabels(); - } - SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, - this.accessibleLabels); - - // inherit from parent if labels not set - if (this.defaultLabelExpression == null && parent != null - && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { - this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); - } - - // set capacity by labels - capacitiyByNodeLabels = - cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels, - labelManager); - - // set maximum capacity by labels - maxCapacityByNodeLabels = - cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), - accessibleLabels, labelManager); this.csContext = cs; + + // initialize ResourceUsage queueUsage = new ResourceUsage(); queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); - authorizer = YarnAuthorizationProvider.getInstance(cs.getConf()); + + // initialize QueueCapacities + queueCapacities = new QueueCapacities(parent == null); + } + + protected void setupConfigurableCapacities() { + CSQueueUtils.loadUpdateAndCheckCapacities( + getQueuePath(), + accessibleLabels, + csContext.getConfiguration(), + queueCapacities, + parent == null ? null : parent.getQueueCapacities(), + csContext.getRMContext().getNodeLabelManager()); } @Override public synchronized float getCapacity() { - return capacity; + return queueCapacities.getCapacity(); } @Override public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; + return queueCapacities.getAbsoluteCapacity(); } @Override public float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; + return queueCapacities.getAbsoluteMaximumCapacity(); } @Override public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; + return queueCapacities.getAbsoluteUsedCapacity(); } @Override public float getMaximumCapacity() { - return maximumCapacity; + return queueCapacities.getMaximumCapacity(); } @Override public synchronized float getUsedCapacity() { - return usedCapacity; + return queueCapacities.getUsedCapacity(); } @Override @@ -216,12 +193,12 @@ public abstract class AbstractCSQueue implements CSQueue { @Override public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; + queueCapacities.setUsedCapacity(usedCapacity); } @Override public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; + queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity); } /** @@ -230,21 +207,16 @@ public abstract class AbstractCSQueue implements CSQueue { */ synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(getQueueName(), + queueCapacities.getCapacity(), maximumCapacity); float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), + queueCapacities.getAbsoluteCapacity(), absMaxCapacity); - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } - - @Override - public float getAbsActualCapacity() { - // for now, simply return actual capacity = guaranteed capacity for parent - // queue - return absoluteCapacity; + queueCapacities.setMaximumCapacity(maximumCapacity); + queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); } @Override @@ -252,39 +224,39 @@ public abstract class AbstractCSQueue implements CSQueue { return defaultLabelExpression; } - synchronized void setupQueueConfigs(Resource clusterResource, float capacity, - float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, - QueueState state, Map acls, - Set labels, String defaultLabelExpression, - Map nodeLabelCapacities, - Map maximumNodeLabelCapacities, - boolean reservationContinueLooking, Resource maxAllocation) + synchronized void setupQueueConfigs(Resource clusterResource) throws IOException { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, - absoluteMaxCapacity); - - this.capacity = capacity; - this.absoluteCapacity = absoluteCapacity; - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; + // get labels + this.accessibleLabels = + csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = csContext.getConfiguration() + .getDefaultNodeLabelExpression(getQueuePath()); - this.state = state; + // inherit from parent if labels not set + if (this.accessibleLabels == null && parent != null) { + this.accessibleLabels = parent.getAccessibleNodeLabels(); + } + SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, + this.accessibleLabels); + + // inherit from parent if labels not set + if (this.defaultLabelExpression == null && parent != null + && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } - this.acls = acls; + // After we setup labels, we can setup capacities + setupConfigurableCapacities(); - // set labels - this.accessibleLabels = labels; + this.minimumAllocation = csContext.getMinimumResourceCapability(); + this.maximumAllocation = + csContext.getConfiguration().getMaximumAllocationPerQueue( + getQueuePath()); - // set label expression - this.defaultLabelExpression = defaultLabelExpression; + authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - // copy node label capacity - this.capacitiyByNodeLabels = new HashMap(nodeLabelCapacities); - this.maxCapacityByNodeLabels = - new HashMap(maximumNodeLabelCapacities); + this.state = csContext.getConfiguration().getState(getQueuePath()); + this.acls = csContext.getConfiguration().getAcls(getQueuePath()); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -311,34 +283,19 @@ public abstract class AbstractCSQueue implements CSQueue { } } } - - // calculate absolute capacity by each node label - this.absoluteCapacityByNodeLabels = - CSQueueUtils.computeAbsoluteCapacityByNodeLabels( - this.capacitiyByNodeLabels, parent); - - // calculate maximum capacity by each node label - this.absoluteMaxCapacityByNodeLabels = - CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels( - maximumNodeLabelCapacities, parent); - - // check absoluteMaximumNodeLabelCapacities is valid - CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(), - absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels); - - this.reservationsContinueLooking = reservationContinueLooking; - this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + this.reservationsContinueLooking = csContext.getConfiguration() + .getReservationContinueLook(); - this.maximumAllocation = maxAllocation; + this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); } protected QueueInfo getQueueInfo() { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setAccessibleNodeLabels(accessibleLabels); - queueInfo.setCapacity(capacity); - queueInfo.setMaximumCapacity(maximumCapacity); + queueInfo.setCapacity(queueCapacities.getCapacity()); + queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); queueInfo.setQueueState(state); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); @@ -389,51 +346,6 @@ public abstract class AbstractCSQueue implements CSQueue { } @Private - public float getCapacityByNodeLabel(String label) { - if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { - if (null == parent) { - return 1f; - } - return getCapacity(); - } - - if (!capacitiyByNodeLabels.containsKey(label)) { - return 0f; - } else { - return capacitiyByNodeLabels.get(label); - } - } - - @Private - public float getAbsoluteCapacityByNodeLabel(String label) { - if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { - if (null == parent) { - return 1f; - } - return getAbsoluteCapacity(); - } - - if (!absoluteCapacityByNodeLabels.containsKey(label)) { - return 0f; - } else { - return absoluteCapacityByNodeLabels.get(label); - } - } - - @Private - public float getAbsoluteMaximumCapacityByNodeLabel(String label) { - if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { - return getAbsoluteMaximumCapacity(); - } - - if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) { - return 0f; - } else { - return absoluteMaxCapacityByNodeLabels.get(label); - } - } - - @Private public boolean getReservationContinueLooking() { return reservationsContinueLooking; } @@ -442,20 +354,20 @@ public abstract class AbstractCSQueue implements CSQueue { public Map getACLs() { return acls; } - + @Private - public Resource getUsedResourceByLabel(String nodeLabel) { - return queueUsage.getUsed(nodeLabel); + public boolean getPreemptionDisabled() { + return preemptionDisabled; } - @VisibleForTesting - public ResourceUsage getResourceUsage() { - return queueUsage; + @Private + public QueueCapacities getQueueCapacities() { + return queueCapacities; } - + @Private - public boolean getPreemptionDisabled() { - return preemptionDisabled; + public ResourceUsage getQueueResourceUsage() { + return queueUsage; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 46ee93c..5cf38c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -75,15 +76,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @return configured queue capacity */ public float getCapacity(); - - /** - * Get actual capacity of the queue, this may be different from - * configured capacity when mis-config take place, like add labels to the - * cluster - * - * @return actual queue capacity - */ - public float getAbsActualCapacity(); /** * Get capacity of the parent of the queue as a function of the @@ -144,14 +136,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { public Resource getUsedResources(); /** - * Get the currently utilized resources which allocated at nodes with label - * specified - * - * @return used resources by the queue and it's children - */ - public Resource getUsedResourceByLabel(String nodeLabel); - - /** * Get the current run-state of the queue * @return current run-state */ @@ -279,31 +263,22 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { */ public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer container); - + /** - * Get absolute capacity by label of this queue can use - * @param nodeLabel - * @return absolute capacity by label of this queue can use + * Check whether disable_preemption property is set for this queue + * @return true if disable_preemption is set, false if not */ - public float getAbsoluteCapacityByNodeLabel(String nodeLabel); + public boolean getPreemptionDisabled(); /** - * Get absolute max capacity by label of this queue can use - * @param nodeLabel - * @return absolute capacity by label of this queue can use - */ - public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel); - - /** - * Get capacity by node label - * @param nodeLabel - * @return capacity by node label + * Get QueueCapacities of this queue + * @return queueCapacities */ - public float getCapacityByNodeLabel(String nodeLabel); - + public QueueCapacities getQueueCapacities(); + /** - * Check whether disable_preemption property is set for this queue - * @return true if disable_preemption is set, false if not + * Get ResourceUsage of this queue + * @return resourceUsage */ - public boolean getPreemptionDisabled(); + public ResourceUsage getQueueResourceUsage(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index f458057..865b0b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -34,6 +35,9 @@ class CSQueueUtils { final static float EPSILON = 0.0001f; + /* + * Used only by tests + */ public static void checkMaxCapacity(String queueName, float capacity, float maximumCapacity) { if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) { @@ -43,6 +47,9 @@ class CSQueueUtils { } } + /* + * Used only by tests + */ public static void checkAbsoluteCapacity(String queueName, float absCapacity, float absMaxCapacity) { if (absMaxCapacity < (absCapacity - EPSILON)) { @@ -53,19 +60,33 @@ class CSQueueUtils { } } - public static void checkAbsoluteCapacitiesByLabel(String queueName, - Map absCapacities, - Map absMaximumCapacities) { - for (Entry entry : absCapacities.entrySet()) { - String label = entry.getKey(); - float absCapacity = entry.getValue(); - float absMaxCapacity = absMaximumCapacities.get(label); - if (absMaxCapacity < (absCapacity - EPSILON)) { - throw new IllegalArgumentException("Illegal call to setMaxCapacity. " - + "Queue '" + queueName + "' has " + "an absolute capacity (" - + absCapacity + ") greater than " - + "its absolute maximumCapacity (" + absMaxCapacity + ") of label=" - + label); + /** + * Check sanity of capacities: + * - capacity <= maxCapacity + * - absCapacity <= absMaximumCapacity + */ + private static void capacitiesSanityCheck(String queueName, + QueueCapacities queueCapacities) { + for (String label : queueCapacities.getExistingNodeLabels()) { + float capacity = queueCapacities.getCapacity(label); + float maximumCapacity = queueCapacities.getMaximumCapacity(label); + if (capacity > maximumCapacity) { + throw new IllegalArgumentException("Illegal queue capacity setting, " + + "(capacity=" + capacity + ") > (maximum-capacity=" + + maximumCapacity + "). When label=[" + label + "]"); + } + + // Actually, this may not needed since we have verified capacity <= + // maximumCapacity. And the way we compute absolute capacity (abs(x) = + // cap(x) * cap(x.parent) * ...) is a monotone increasing function. But + // just keep it here to make sure our compute abs capacity method works + // correctly. + float absCapacity = queueCapacities.getAbsoluteCapacity(label); + float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label); + if (absCapacity > absMaxCapacity) { + throw new IllegalArgumentException("Illegal queue capacity setting, " + + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity=" + + absMaxCapacity + "). When label=[" + label + "]"); } } } @@ -77,37 +98,94 @@ class CSQueueUtils { return (parentAbsMaxCapacity * maximumCapacity); } - public static Map computeAbsoluteCapacityByNodeLabels( - Map nodeLabelToCapacities, CSQueue parent) { - if (parent == null) { - return nodeLabelToCapacities; + /** + * This method intends to be used by ReservationQueue, ReservationQueue will + * not appear in configuration file, so we shouldn't do load capacities + * settings in configuration for reservation queue. + */ + public static void updateAndCheckCapacitiesByLabel(String queuePath, + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { + updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); + + capacitiesSanityCheck(queuePath, queueCapacities); + } + + /** + * Do following steps for capacities + * - Load capacities from configuration + * - Update absolute capacities for new capacities + * - Check if capacities/absolute-capacities legal + */ + public static void loadUpdateAndCheckCapacities(String queuePath, + Set accessibleLabels, CapacitySchedulerConfiguration csConf, + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities, + RMNodeLabelsManager nlm) { + loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm, + queueCapacities, csConf); + + updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); + + capacitiesSanityCheck(queuePath, queueCapacities); + } + + // Considered NO_LABEL, ANY and null cases + private static Set normalizeAccessibleNodeLabels(Set labels, + RMNodeLabelsManager mgr) { + Set accessibleLabels = new HashSet(); + if (labels != null) { + accessibleLabels.addAll(labels); } - - Map absoluteCapacityByNodeLabels = - new HashMap(); - for (Entry entry : nodeLabelToCapacities.entrySet()) { - String label = entry.getKey(); - float capacity = entry.getValue(); - absoluteCapacityByNodeLabels.put(label, - capacity * parent.getAbsoluteCapacityByNodeLabel(label)); + if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) { + accessibleLabels.addAll(mgr.getClusterNodeLabels()); } - return absoluteCapacityByNodeLabels; + accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL); + + return accessibleLabels; } - public static Map computeAbsoluteMaxCapacityByNodeLabels( - Map maximumNodeLabelToCapacities, CSQueue parent) { - if (parent == null) { - return maximumNodeLabelToCapacities; + private static void loadCapacitiesByLabelsFromConf(String queuePath, + Set labels, RMNodeLabelsManager mgr, + QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { + queueCapacities.clearConfigurableFields(); + labels = normalizeAccessibleNodeLabels(labels, mgr); + + for (String label : labels) { + if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { + queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL, + csConf.getNonLabeledQueueCapacity(queuePath) / 100); + queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL, + csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100); + } else { + queueCapacities.setCapacity(label, + csConf.getLabeledQueueCapacity(queuePath, label) / 100); + queueCapacities.setMaximumCapacity(label, + csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100); + } } - Map absoluteMaxCapacityByNodeLabels = - new HashMap(); - for (Entry entry : maximumNodeLabelToCapacities.entrySet()) { - String label = entry.getKey(); - float maxCapacity = entry.getValue(); - absoluteMaxCapacityByNodeLabels.put(label, - maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label)); + } + + // Set absolute capacities for {capacity, maximum-capacity} + private static void updateAbsoluteCapacitiesByNodeLabels( + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { + for (String label : queueCapacities.getExistingNodeLabels()) { + float capacity = queueCapacities.getCapacity(label); + if (capacity > 0f) { + queueCapacities.setAbsoluteCapacity( + label, + capacity + * (parentQueueCapacities == null ? 1 : parentQueueCapacities + .getAbsoluteCapacity(label))); + } + + float maxCapacity = queueCapacities.getMaximumCapacity(label); + if (maxCapacity > 0f) { + queueCapacities.setAbsoluteMaximumCapacity( + label, + maxCapacity + * (parentQueueCapacities == null ? 1 : parentQueueCapacities + .getAbsoluteMaximumCapacity(label))); + } } - return absoluteMaxCapacityByNodeLabels; } @Lock(CSQueue.class) http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b49a60a..3528c2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -278,6 +278,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } private String getNodeLabelPrefix(String queue, String label) { + if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { + return getQueuePrefix(queue); + } return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT; } @@ -316,7 +319,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur getMaximumApplicationMasterResourcePercent()); } - public float getCapacity(String queue) { + public float getNonLabeledQueueCapacity(String queue) { float capacity = queue.equals("root") ? 100.0f : getFloat( getQueuePrefix(queue) + CAPACITY, UNDEFINED); if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { @@ -338,7 +341,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur ", capacity=" + capacity); } - public float getMaximumCapacity(String queue) { + public float getNonLabeledQueueMaximumCapacity(String queue) { float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY, MAXIMUM_CAPACITY_VALUE); maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? @@ -442,57 +445,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return Collections.unmodifiableSet(set); } - public Map getNodeLabelCapacities(String queue, - Set labels, RMNodeLabelsManager mgr) { - Map nodeLabelCapacities = new HashMap(); - - if (labels == null) { - return nodeLabelCapacities; + private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, + float defaultValue) { + String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; + float capacity = getFloat(capacityPropertyName, defaultValue); + if (capacity < MINIMUM_CAPACITY_VALUE + || capacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal capacity of " + capacity + + " for node-label=" + label + " in queue=" + queue + + ", valid capacity should in range of [0, 100]."); } - - for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr - .getClusterNodeLabels() : labels) { - String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY; - float capacity = getFloat(capacityPropertyName, 0f); - if (capacity < MINIMUM_CAPACITY_VALUE - || capacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal capacity of " + capacity - + " for node-label=" + label + " in queue=" + queue - + ", valid capacity should in range of [0, 100]."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); - } - - nodeLabelCapacities.put(label, capacity / 100f); + if (LOG.isDebugEnabled()) { + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); } - return nodeLabelCapacities; + return capacity; } - public Map getMaximumNodeLabelCapacities(String queue, - Set labels, RMNodeLabelsManager mgr) { - Map maximumNodeLabelCapacities = new HashMap(); - if (labels == null) { - return maximumNodeLabelCapacities; - } - - for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr - .getClusterNodeLabels() : labels) { - float maxCapacity = - getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, - 100f); - if (maxCapacity < MINIMUM_CAPACITY_VALUE - || maxCapacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal " + "capacity of " - + maxCapacity + " for label=" + label + " in queue=" + queue); - } - LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity); - - maximumNodeLabelCapacities.put(label, maxCapacity / 100f); - } - return maximumNodeLabelCapacities; + public float getLabeledQueueCapacity(String queue, String label) { + return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f); + } + + public float getLabeledQueueMaximumCapacity(String queue, String label) { + return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f); } public String getDefaultNodeLabelExpression(String queue) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5258223..38d4712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -122,49 +122,7 @@ public class LeafQueue extends AbstractCSQueue { super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); - this.minimumAllocationFactor = - Resources.ratio(resourceCalculator, - Resources.subtract(maximumAllocation, minimumAllocation), - maximumAllocation); - - float capacity = getCapacityFromConf(); - float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; - - float maximumCapacity = - (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100; - float absoluteMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - - int userLimit = cs.getConfiguration().getUserLimit(getQueuePath()); - float userLimitFactor = - cs.getConfiguration().getUserLimitFactor(getQueuePath()); - - int maxApplications = - cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); - if (maxApplications < 0) { - int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); - maxApplications = (int)(maxSystemApps * absoluteCapacity); - } - maxApplicationsPerUser = - (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor); - - float maxAMResourcePerQueuePercent = cs.getConfiguration() - .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); - - QueueState state = cs.getConfiguration().getState(getQueuePath()); - - Map acls = - cs.getConfiguration().getAcls(getQueuePath()); - - setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, - maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels, - defaultLabelExpression, this.capacitiyByNodeLabels, - this.maxCapacityByNodeLabels, - cs.getConfiguration().getReservationContinueLook(), - cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath())); + this.activeUsersManager = new ActiveUsersManager(metrics); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -176,35 +134,13 @@ public class LeafQueue extends AbstractCSQueue { this.pendingApplications = new TreeSet(applicationComparator); this.activeApplications = new TreeSet(applicationComparator); + + setupQueueConfigs(cs.getClusterResource()); } - - // externalizing in method, to allow overriding - protected float getCapacityFromConf() { - return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; - } - - protected synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, - int userLimit, float userLimitFactor, - int maxApplications, float maxAMResourcePerQueuePercent, - int maxApplicationsPerUser, QueueState state, - Map acls, int nodeLocalityDelay, - Set labels, String defaultLabelExpression, - Map capacitieByLabel, - Map maximumCapacitiesByLabel, - boolean revervationContinueLooking, - Resource maxAllocation) throws IOException { - super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, labels, - defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel, - revervationContinueLooking, maxAllocation); - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, - absoluteMaxCapacity); + + protected synchronized void setupQueueConfigs(Resource clusterResource) + throws IOException { + super.setupQueueConfigs(clusterResource); this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); @@ -214,16 +150,24 @@ public class LeafQueue extends AbstractCSQueue { // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, absoluteMaxCapacity); + updateHeadroomInfo(clusterResource, + queueCapacities.getAbsoluteMaximumCapacity()); - this.absoluteCapacity = absCapacity; + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + userLimit = conf.getUserLimit(getQueuePath()); + userLimitFactor = conf.getUserLimitFactor(getQueuePath()); - this.userLimit = userLimit; - this.userLimitFactor = userLimitFactor; - - this.maxApplications = maxApplications; - this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; - this.maxApplicationsPerUser = maxApplicationsPerUser; + maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); + if (maxApplications < 0) { + int maxSystemApps = conf.getMaximumSystemApplications(); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); + } + maxApplicationsPerUser = + (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor); + + maxAMResourcePerQueuePercent = + conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression)) { @@ -239,7 +183,7 @@ public class LeafQueue extends AbstractCSQueue { getAccessibleNodeLabels().iterator(), ','))); } - this.nodeLocalityDelay = nodeLocalityDelay; + nodeLocalityDelay = conf.getNodeLocalityDelay(); // re-init this since max allocation could have changed this.minimumAllocationFactor = @@ -253,21 +197,21 @@ public class LeafQueue extends AbstractCSQueue { } StringBuilder labelStrBuilder = new StringBuilder(); - if (labels != null) { - for (String s : labels) { + if (accessibleLabels != null) { + for (String s : accessibleLabels) { labelStrBuilder.append(s); labelStrBuilder.append(","); } } LOG.info("Initializing " + queueName + "\n" + - "capacity = " + capacity + + "capacity = " + queueCapacities.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + - "asboluteCapacity = " + absoluteCapacity + + "asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() + " [= parentAbsoluteCapacity * capacity ]" + "\n" + - "maxCapacity = " + maximumCapacity + + "maxCapacity = " + queueCapacities.getMaximumCapacity() + " [= configuredMaxCapacity ]" + "\n" + - "absoluteMaxCapacity = " + absoluteMaxCapacity + + "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() + " [= 1.0 maximumCapacity undefined, " + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "\n" + @@ -282,7 +226,7 @@ public class LeafQueue extends AbstractCSQueue { "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " + "userLimitFactor) ]" + "\n" + - "usedCapacity = " + usedCapacity + + "usedCapacity = " + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + "absoluteUsedCapacity = " + absoluteUsedCapacity + @@ -441,8 +385,8 @@ public class LeafQueue extends AbstractCSQueue { public String toString() { return queueName + ": " + - "capacity=" + capacity + ", " + - "absoluteCapacity=" + absoluteCapacity + ", " + + "capacity=" + queueCapacities.getCapacity() + ", " + + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + @@ -505,23 +449,7 @@ public class LeafQueue extends AbstractCSQueue { + ", trying to set it to: " + newMax); } - setupQueueConfigs( - clusterResource, - newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, - newlyParsedLeafQueue.maximumCapacity, - newlyParsedLeafQueue.absoluteMaxCapacity, - newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, - newlyParsedLeafQueue.maxApplications, - newlyParsedLeafQueue.maxAMResourcePerQueuePercent, - newlyParsedLeafQueue.getMaxApplicationsPerUser(), - newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, - newlyParsedLeafQueue.getNodeLocalityDelay(), - newlyParsedLeafQueue.accessibleLabels, - newlyParsedLeafQueue.defaultLabelExpression, - newlyParsedLeafQueue.capacitiyByNodeLabels, - newlyParsedLeafQueue.maxCapacityByNodeLabels, - newlyParsedLeafQueue.reservationsContinueLooking, - newlyParsedLeafQueue.getMaximumAllocation()); + setupQueueConfigs(clusterResource); // queue metrics are updated, more resource may be available // activate the pending applications if possible @@ -1034,7 +962,8 @@ public class LeafQueue extends AbstractCSQueue { application.getCurrentReservation()), labelManager.getResourceByLabel(label, clusterResource)); - if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { + if (potentialNewWithoutReservedCapacity <= queueCapacities + .getAbsoluteMaximumCapacity()) { if (LOG.isDebugEnabled()) { LOG.debug("try to use reserved: " + getQueueName() @@ -1048,8 +977,9 @@ public class LeafQueue extends AbstractCSQueue { + Resources.divide(resourceCalculator, clusterResource, queueUsage.getUsed(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " - + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " - + absoluteMaxCapacity + ")"); + + potentialNewWithoutReservedCapacity + " ( " + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity() + ")"); } // we could potentially use this node instead of reserved node return true; @@ -1058,7 +988,8 @@ public class LeafQueue extends AbstractCSQueue { // Otherwise, if any of the label of this node beyond queue limit, we // cannot allocate on this node. Consider a small epsilon here. - if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) { + if (potentialNewCapacity > queueCapacities + .getAbsoluteMaximumCapacity(label) + 1e-4) { canAssign = false; break; } @@ -1073,7 +1004,8 @@ public class LeafQueue extends AbstractCSQueue { queueUsage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)) + " potentialNewCapacity: " + potentialNewCapacity + " ( " - + " max-capacity: " + absoluteMaxCapacity + ")"); + + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity() + + ")"); } } @@ -1162,7 +1094,7 @@ public class LeafQueue extends AbstractCSQueue { Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager.getResourceByLabel(firstLabel, clusterResource), - getAbsoluteCapacityByNodeLabel(firstLabel), + queueCapacities.getAbsoluteCapacity(firstLabel), minimumAllocation)); } else { // else there's no label on request, just to use absolute capacity as @@ -1170,7 +1102,7 @@ public class LeafQueue extends AbstractCSQueue { queueCapacity = Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource), - absoluteCapacity, minimumAllocation); + queueCapacities.getAbsoluteCapacity(), minimumAllocation); } // Allow progress for queues with miniscule capacity @@ -1815,12 +1747,9 @@ public class LeafQueue extends AbstractCSQueue { } private void updateAbsoluteCapacityResource(Resource clusterResource) { - - absoluteCapacityResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - clusterResource, - absoluteCapacity, minimumAllocation); - + absoluteCapacityResource = + Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource, + queueCapacities.getAbsoluteCapacity(), minimumAllocation); } @Override @@ -1831,7 +1760,8 @@ public class LeafQueue extends AbstractCSQueue { // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, absoluteMaxCapacity); + updateHeadroomInfo(clusterResource, + queueCapacities.getAbsoluteMaximumCapacity()); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -2004,35 +1934,13 @@ public class LeafQueue extends AbstractCSQueue { getParent().detachContainer(clusterResource, application, rmContainer); } } - - @Override - public float getAbsActualCapacity() { - //? Is this actually used by anything at present? - // There is a findbugs warning -re lastClusterResource (now excluded), - // when this is used, verify that the access is mt correct and remove - // the findbugs exclusion if possible - if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - lastClusterResource, Resources.none())) { - return absoluteCapacity; - } - - Resource resourceRespectLabels = - labelManager == null ? lastClusterResource : labelManager - .getQueueResource(queueName, accessibleLabels, lastClusterResource); - float absActualCapacity = - Resources.divide(resourceCalculator, lastClusterResource, - resourceRespectLabels, lastClusterResource); - - return absActualCapacity > absoluteCapacity ? absoluteCapacity - : absActualCapacity; - } public void setCapacity(float capacity) { - this.capacity = capacity; + queueCapacities.setCapacity(capacity); } public void setAbsoluteCapacity(float absoluteCapacity) { - this.absoluteCapacity = absoluteCapacity; + queueCapacities.setAbsoluteCapacity(absoluteCapacity); } public void setMaxApplications(int maxApplications) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 29a8ba3..a26b0aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -87,7 +87,7 @@ public class ParentQueue extends AbstractCSQueue { this.rootQueue = (parent == null); - float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath()); + float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath()); if (rootQueue && (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { @@ -95,46 +95,20 @@ public class ParentQueue extends AbstractCSQueue { "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - - float capacity = (float) rawCapacity / 100; - float parentAbsoluteCapacity = - (rootQueue) ? 1.0f : parent.getAbsoluteCapacity(); - float absoluteCapacity = parentAbsoluteCapacity * capacity; - - float maximumCapacity = - (float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100; - float absoluteMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - - QueueState state = cs.getConfiguration().getState(getQueuePath()); - - Map acls = - cs.getConfiguration().getAcls(getQueuePath()); - - setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, - defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, - cs.getConfiguration().getReservationContinueLook()); this.childQueues = new TreeSet(queueComparator); + + setupQueueConfigs(cs.getClusterResource()); LOG.info("Initialized parent-queue " + queueName + " name=" + queueName + ", fullname=" + getQueuePath()); } - synchronized void setupQueueConfigs(Resource clusterResource, float capacity, - float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, - QueueState state, Map acls, - Set accessibleLabels, String defaultLabelExpression, - Map nodeLabelCapacities, - Map maximumCapacitiesByLabel, - boolean reservationContinueLooking) throws IOException { - super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, - defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel, - reservationContinueLooking, maximumAllocation); - StringBuilder aclsString = new StringBuilder(); + synchronized void setupQueueConfigs(Resource clusterResource) + throws IOException { + super.setupQueueConfigs(clusterResource); + StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } @@ -148,10 +122,10 @@ public class ParentQueue extends AbstractCSQueue { } LOG.info(queueName + - ", capacity=" + capacity + - ", asboluteCapacity=" + absoluteCapacity + - ", maxCapacity=" + maximumCapacity + - ", asboluteMaxCapacity=" + absoluteMaxCapacity + + ", capacity=" + this.queueCapacities.getCapacity() + + ", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + + ", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + @@ -167,19 +141,19 @@ public class ParentQueue extends AbstractCSQueue { } float delta = Math.abs(1.0f - childCapacities); // crude way to check // allow capacities being set to 0, and enforce child 0 if parent is 0 - if (((capacity > 0) && (delta > PRECISION)) || - ((capacity == 0) && (childCapacities > 0))) { + if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || + ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { throw new IllegalArgumentException("Illegal" + " capacity of " + childCapacities + " for children of queue " + queueName); } // check label capacities for (String nodeLabel : labelManager.getClusterNodeLabels()) { - float capacityByLabel = getCapacityByNodeLabel(nodeLabel); + float capacityByLabel = queueCapacities.getCapacity(nodeLabel); // check children's labels float sum = 0; for (CSQueue queue : childQueues) { - sum += queue.getCapacityByNodeLabel(nodeLabel); + sum += queue.getQueueCapacities().getCapacity(nodeLabel); } if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) || (capacityByLabel == 0) && (sum > 0)) { @@ -255,8 +229,8 @@ public class ParentQueue extends AbstractCSQueue { public String toString() { return queueName + ": " + "numChildQueue= " + childQueues.size() + ", " + - "capacity=" + capacity + ", " + - "absoluteCapacity=" + absoluteCapacity + ", " + + "capacity=" + queueCapacities.getCapacity() + ", " + + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -264,9 +238,8 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void reinitialize( - CSQueue newlyParsedQueue, Resource clusterResource) - throws IOException { + public synchronized void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { // Sanity check if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { @@ -277,18 +250,7 @@ public class ParentQueue extends AbstractCSQueue { ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; // Set new configs - setupQueueConfigs(clusterResource, - newlyParsedParentQueue.capacity, - newlyParsedParentQueue.absoluteCapacity, - newlyParsedParentQueue.maximumCapacity, - newlyParsedParentQueue.absoluteMaxCapacity, - newlyParsedParentQueue.state, - newlyParsedParentQueue.acls, - newlyParsedParentQueue.accessibleLabels, - newlyParsedParentQueue.defaultLabelExpression, - newlyParsedParentQueue.capacitiyByNodeLabels, - newlyParsedParentQueue.maxCapacityByNodeLabels, - newlyParsedParentQueue.reservationsContinueLooking); + setupQueueConfigs(clusterResource); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -513,7 +475,7 @@ public class ParentQueue extends AbstractCSQueue { labelManager.getResourceByLabel(label, clusterResource)); // if any of the label doesn't beyond limit, we can allocate on this node if (currentAbsoluteLabelUsedCapacity >= - getAbsoluteMaximumCapacityByNodeLabel(label)) { + queueCapacities.getAbsoluteMaximumCapacity(label)) { if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " used=" + queueUsage.getUsed() + " current-capacity (" + queueUsage.getUsed(label) + ") " @@ -541,7 +503,8 @@ public class ParentQueue extends AbstractCSQueue { Resources.subtract(queueUsage.getUsed(), reservedResources), clusterResource); - if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { + if (capacityWithoutReservedCapacity <= queueCapacities + .getAbsoluteMaximumCapacity()) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() + " usedResources: " + queueUsage.getUsed().getMemory() @@ -551,7 +514,7 @@ public class ParentQueue extends AbstractCSQueue { / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " - + absoluteMaxCapacity + ")"); + + queueCapacities.getAbsoluteMaximumCapacity() + ")"); } // we could potentially use this node instead of reserved node return true; @@ -761,13 +724,6 @@ public class ParentQueue extends AbstractCSQueue { } } } - - @Override - public float getAbsActualCapacity() { - // for now, simply return actual capacity = guaranteed capacity for parent - // queue - return absoluteCapacity; - } public synchronized int getNumApplications() { return numApplications; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index f8b11eb..7b53ad5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; @@ -99,16 +97,7 @@ public class PlanQueue extends ParentQueue { } // Set new configs - setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), - newlyParsedParentQueue.getAbsoluteCapacity(), - newlyParsedParentQueue.getMaximumCapacity(), - newlyParsedParentQueue.getAbsoluteMaximumCapacity(), - newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(), - newlyParsedParentQueue.accessibleLabels, - newlyParsedParentQueue.defaultLabelExpression, - newlyParsedParentQueue.capacitiyByNodeLabels, - newlyParsedParentQueue.maxCapacityByNodeLabels, - newlyParsedParentQueue.getReservationContinueLooking()); + setupQueueConfigs(clusterResource); updateQuotas(newlyParsedParentQueue.userLimit, newlyParsedParentQueue.userLimitFactor, http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index a0e6d8c..962a636 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -19,12 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; + +import com.google.common.collect.Sets; public class QueueCapacities { private static final String NL = CommonNodeLabelsManager.NO_LABEL; @@ -32,13 +38,15 @@ public class QueueCapacities { private Map capacitiesMap; private ReadLock readLock; private WriteLock writeLock; + private final boolean isRoot; - public QueueCapacities() { + public QueueCapacities(boolean isRoot) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); capacitiesMap = new HashMap(); + this.isRoot = isRoot; } // Usage enum here to make implement cleaner @@ -58,6 +66,18 @@ public class QueueCapacities { public Capacities() { capacitiesArr = new float[CapacityType.values().length]; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + capacitiesArr[0] + "%, "); + sb.append("abs_used=" + capacitiesArr[1] + "%, "); + sb.append("max_cap=" + capacitiesArr[2] + "%, "); + sb.append("abs_max_cap=" + capacitiesArr[3] + "%, "); + sb.append("cap=" + capacitiesArr[4] + "%, "); + sb.append("abs_cap=" + capacitiesArr[5] + "%}"); + return sb.toString(); + } } private float _get(String label, CapacityType type) { @@ -127,6 +147,10 @@ public class QueueCapacities { } public float getCapacity(String label) { + if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) { + return 1f; + } + return _get(label, CapacityType.CAP); } @@ -144,6 +168,9 @@ public class QueueCapacities { } public float getAbsoluteCapacity(String label) { + if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) { + return 1f; + } return _get(label, CapacityType.ABS_CAP); } @@ -188,4 +215,43 @@ public class QueueCapacities { public void setAbsoluteMaximumCapacity(String label, float value) { _set(label, CapacityType.ABS_MAX_CAP, value); } + + /** + * Clear configurable fields, like + * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue + * reinitialize, when we reinitialize a queue, we will first clear all + * configurable fields, and load new values + */ + public void clearConfigurableFields() { + try { + writeLock.lock(); + for (String label : capacitiesMap.keySet()) { + _set(label, CapacityType.CAP, 0); + _set(label, CapacityType.MAX_CAP, 0); + _set(label, CapacityType.ABS_CAP, 0); + _set(label, CapacityType.ABS_MAX_CAP, 0); + } + } finally { + writeLock.unlock(); + } + } + + public Set getExistingNodeLabels() { + try { + readLock.lock(); + return new HashSet(capacitiesMap.keySet()); + } finally { + readLock.unlock(); + } + } + + @Override + public String toString() { + try { + readLock.lock(); + return this.capacitiesMap.toString(); + } finally { + readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index c4424b5..a8d17cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -62,11 +62,11 @@ public class ReservationQueue extends LeafQueue { throw new IOException("Trying to reinitialize " + getQueuePath() + " from " + newlyParsedQueue.getQueuePath()); } + super.reinitialize(newlyParsedQueue, clusterResource); CSQueueUtils.updateQueueStatistics( parent.schedulerContext.getResourceCalculator(), newlyParsedQueue, parent, parent.schedulerContext.getClusterResource(), parent.schedulerContext.getMinimumResourceCapability()); - super.reinitialize(newlyParsedQueue, clusterResource); updateQuotas(parent.getUserLimitForReservation(), parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), @@ -108,9 +108,9 @@ public class ReservationQueue extends LeafQueue { maxApplicationsPerUser = maxAppsPerUserForReservation; } - // used by the super constructor, we initialize to zero - protected float getCapacityFromConf() { - return 0f; + @Override + protected void setupConfigurableCapacities() { + CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), + queueCapacities, parent == null ? null : parent.getQueueCapacities()); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java index d643c9d..5135ba9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -202,33 +202,33 @@ public class TestCSQueueUtils { LOG.info("t2 l2q2 " + result); //some usage, but below the base capacity - root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.4f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //usage gt base on parent sibling - root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); + root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); + l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //same as last, but with usage also on direct parent - root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //add to direct sibling, below the threshold of effect at present - root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); @@ -236,9 +236,9 @@ public class TestCSQueueUtils { //add to direct sibling, now above the threshold of effect //(it's cumulative with prior tests) - root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.1f, result, 0.000001f); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 38d9d27..fabf47d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -389,11 +389,11 @@ public class TestCapacityScheduler { public void testMaximumCapacitySetup() { float delta = 0.0000001f; CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta); + assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta); conf.setMaximumCapacity(A, 50.0f); - assertEquals(50.0f, conf.getMaximumCapacity(A),delta); + assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A),delta); conf.setMaximumCapacity(A, -1); - assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta); + assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 923c0a3..1e339d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -91,7 +91,8 @@ public class TestCapacitySchedulerNodeLabelUpdate { String label) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); CSQueue queue = scheduler.getQueue(queueName); - Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory()); + Assert.assertEquals(memory, queue.getQueueResourceUsage().getUsed(label) + .getMemory()); } @Test (timeout = 30000) http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java index 6d2a421..d543f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -99,7 +99,7 @@ public class TestQueueCapacities { } private void internalTestModifyAndRead(String label) throws Exception { - QueueCapacities qc = new QueueCapacities(); + QueueCapacities qc = new QueueCapacities(false); // First get returns 0 always Assert.assertEquals(0f, get(qc, suffix, label), 1e-8); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index f821e64..34f7c2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -468,18 +468,18 @@ public class TestQueueParsing { // check capacity of A2 CSQueue qA2 = capacityScheduler.getQueue("a2"); Assert.assertEquals(0.7, qA2.getCapacity(), DELTA); - Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA); + Assert.assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA); Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA); - Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA); + Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA); Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA); - Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA); + Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA); // check capacity of B3 CSQueue qB3 = capacityScheduler.getQueue("b3"); Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA); - Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA); + Assert.assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA); Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA); - Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA); + Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA); } private void @@ -691,8 +691,8 @@ public class TestQueueParsing { // check root queue's capacity by label -- they should be all zero CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT); - Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA); - Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA); + Assert.assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA); + Assert.assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA); CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f14f01be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 94040b5..ba5c73b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -395,9 +395,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase { String qshortName = qArr[qArr.length - 1]; assertEquals("usedCapacity doesn't match", 0, info.usedCapacity, 1e-3f); - assertEquals("capacity doesn't match", csConf.getCapacity(q), + assertEquals("capacity doesn't match", csConf.getNonLabeledQueueCapacity(q), info.capacity, 1e-3f); - float expectCapacity = csConf.getMaximumCapacity(q); + float expectCapacity = csConf.getNonLabeledQueueMaximumCapacity(q); float expectAbsMaxCapacity = parentAbsMaxCapacity * (info.maxCapacity/100); if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) { expectCapacity = 100;