Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DB211073C for ; Tue, 3 Mar 2015 19:32:00 +0000 (UTC) Received: (qmail 22935 invoked by uid 500); 3 Mar 2015 19:31:41 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 22608 invoked by uid 500); 3 Mar 2015 19:31:41 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 20616 invoked by uid 99); 3 Mar 2015 19:31:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 19:31:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 23FADE1080; Tue, 3 Mar 2015 19:31:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Tue, 03 Mar 2015 19:32:07 -0000 Message-Id: In-Reply-To: <904c2cf0a64649c19c869743df7bb26c@git.apache.org> References: <904c2cf0a64649c19c869743df7bb26c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/43] hadoop git commit: YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan. YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14dd647c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14dd647c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14dd647c Branch: refs/heads/YARN-2928 Commit: 14dd647c556016d351f425ee956ccf800ccb9ce2 Parents: abac6eb Author: Vinod Kumar Vavilapalli Authored: Mon Mar 2 17:52:47 2015 -0800 Committer: Vinod Kumar Vavilapalli Committed: Mon Mar 2 17:52:47 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/ResourceLimits.java | 40 +++ .../scheduler/ResourceUsage.java | 61 ++--- .../scheduler/capacity/AbstractCSQueue.java | 24 +- .../scheduler/capacity/CSQueue.java | 11 +- .../scheduler/capacity/CSQueueUtils.java | 48 ---- .../capacity/CapacityHeadroomProvider.java | 16 +- .../scheduler/capacity/CapacityScheduler.java | 30 ++- .../scheduler/capacity/LeafQueue.java | 131 +++++----- .../scheduler/capacity/ParentQueue.java | 53 +++- .../yarn/server/resourcemanager/MockAM.java | 11 +- .../scheduler/TestResourceUsage.java | 2 +- .../capacity/TestApplicationLimits.java | 32 +-- .../scheduler/capacity/TestCSQueueUtils.java | 250 ------------------- .../capacity/TestCapacityScheduler.java | 85 ++++++- .../scheduler/capacity/TestChildQueueOrder.java | 36 ++- .../scheduler/capacity/TestLeafQueue.java | 221 ++++++++++------ .../scheduler/capacity/TestParentQueue.java | 106 ++++---- .../scheduler/capacity/TestReservations.java | 100 +++++--- 19 files changed, 646 insertions(+), 614 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d07aa26..0850f0b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -686,6 +686,9 @@ Release 2.7.0 - UNRELEASED YARN-3270. Fix node label expression not getting set in ApplicationSubmissionContext (Rohit Agarwal via wangda) + YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's + available resource-limit from the parent queue. (Wangda Tan via vinodkv) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java new file mode 100644 index 0000000..12333e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Resource limits for queues/applications, this means max overall (please note + * that, it's not "extra") resource you can get. + */ +public class ResourceLimits { + public ResourceLimits(Resource limit) { + this.limit = limit; + } + + volatile Resource limit; + public Resource getLimit() { + return limit; + } + + public void setLimit(Resource limit) { + this.limit = limit; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index c651878..de44bbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -50,11 +50,12 @@ public class ResourceUsage { writeLock = lock.writeLock(); usages = new HashMap(); + usages.put(NL, new UsageByLabel(NL)); } // Usage enum here to make implement cleaner private enum ResourceType { - USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4); + USED(0), PENDING(1), AMUSED(2), RESERVED(3); private int idx; @@ -71,7 +72,18 @@ public class ResourceUsage { resArr = new Resource[ResourceType.values().length]; for (int i = 0; i < resArr.length; i++) { resArr[i] = Resource.newInstance(0, 0); - } + }; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + resArr[0] + "%, "); + sb.append("pending=" + resArr[1] + "%, "); + sb.append("am_used=" + resArr[2] + "%, "); + sb.append("reserved=" + resArr[3] + "%, "); + sb.append("headroom=" + resArr[4] + "%}"); + return sb.toString(); } } @@ -181,41 +193,6 @@ public class ResourceUsage { } /* - * Headroom - */ - public Resource getHeadroom() { - return getHeadroom(NL); - } - - public Resource getHeadroom(String label) { - return _get(label, ResourceType.HEADROOM); - } - - public void incHeadroom(String label, Resource res) { - _inc(label, ResourceType.HEADROOM, res); - } - - public void incHeadroom(Resource res) { - incHeadroom(NL, res); - } - - public void decHeadroom(Resource res) { - decHeadroom(NL, res); - } - - public void decHeadroom(String label, Resource res) { - _dec(label, ResourceType.HEADROOM, res); - } - - public void setHeadroom(Resource res) { - setHeadroom(NL, res); - } - - public void setHeadroom(String label, Resource res) { - _set(label, ResourceType.HEADROOM, res); - } - - /* * AM-Used */ public Resource getAMUsed() { @@ -309,4 +286,14 @@ public class ResourceUsage { writeLock.unlock(); } } + + @Override + public String toString() { + try { + readLock.lock(); + return usages.toString(); + } finally { + readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index eb7218b..d800709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -40,9 +40,11 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; @@ -52,7 +54,7 @@ public abstract class AbstractCSQueue implements CSQueue { final String queueName; volatile int numContainers; - Resource minimumAllocation; + final Resource minimumAllocation; Resource maximumAllocation; QueueState state; final QueueMetrics metrics; @@ -94,6 +96,7 @@ public abstract class AbstractCSQueue implements CSQueue { cs.getConf()); this.csContext = cs; + this.minimumAllocation = csContext.getMinimumResourceCapability(); // initialize ResourceUsage queueUsage = new ResourceUsage(); @@ -248,7 +251,6 @@ public abstract class AbstractCSQueue implements CSQueue { // After we setup labels, we can setup capacities setupConfigurableCapacities(); - this.minimumAllocation = csContext.getMinimumResourceCapability(); this.maximumAllocation = csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); @@ -403,4 +405,22 @@ public abstract class AbstractCSQueue implements CSQueue { return csConf.getPreemptionDisabled(q.getQueuePath(), parentQ.getPreemptionDisabled()); } + + protected Resource getCurrentResourceLimit(Resource clusterResource, + ResourceLimits currentResourceLimits) { + /* + * Queue's max available resource = min(my.max, my.limit) + * my.limit is set by my parent, considered used resource of my siblings + */ + Resource queueMaxResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource, + queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); + Resource queueCurrentResourceLimit = + Resources.min(resourceCalculator, clusterResource, queueMaxResource, + currentResourceLimits.getLimit()); + queueCurrentResourceLimit = + Resources.roundDown(resourceCalculator, queueCurrentResourceLimit, + minimumAllocation); + return queueCurrentResourceLimit; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 5cf38c1..0a60acc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -189,10 +190,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @param clusterResource the resource of the cluster. * @param node node on which resources are available * @param needToUnreserve assign container only if it can unreserve one first + * @param resourceLimits how much overall resource of this queue can use. * @return the assignment */ - public CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve); + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits resourceLimits); /** * A container assigned to the queue has completed. @@ -231,8 +234,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { /** * Update the cluster resource for queues as we add/remove nodes * @param clusterResource the current cluster resource + * @param resourceLimits the current ResourceLimits */ - public void updateClusterResource(Resource clusterResource); + public void updateClusterResource(Resource clusterResource, + ResourceLimits resourceLimits); /** * Get the {@link ActiveUsersManager} for the queue. http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 865b0b4..1921195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -225,52 +225,4 @@ class CSQueueUtils { ) ); } - - public static float getAbsoluteMaxAvailCapacity( - ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { - CSQueue parent = queue.getParent(); - if (parent == null) { - return queue.getAbsoluteMaximumCapacity(); - } - - //Get my parent's max avail, needed to determine my own - float parentMaxAvail = getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, parent); - //...and as a resource - Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); - - //check for no resources parent before dividing, if so, max avail is none - if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { - return 0.0f; - } - //sibling used is parent used - my used... - float siblingUsedCapacity = Resources.ratio( - resourceCalculator, - Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), - parentResource); - //my max avail is the lesser of my max capacity and what is unused from my parent - //by my siblings (if they are beyond their base capacity) - float maxAvail = Math.min( - queue.getMaximumCapacity(), - 1.0f - siblingUsedCapacity); - //and, mutiply by parent to get absolute (cluster relative) value - float absoluteMaxAvail = maxAvail * parentMaxAvail; - - if (LOG.isDebugEnabled()) { - LOG.debug("qpath " + queue.getQueuePath()); - LOG.debug("parentMaxAvail " + parentMaxAvail); - LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); - LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); - LOG.debug("maxAvail " + maxAvail); - LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); - } - - if (absoluteMaxAvail < 0.0f) { - absoluteMaxAvail = 0.0f; - } else if (absoluteMaxAvail > 1.0f) { - absoluteMaxAvail = 1.0f; - } - - return absoluteMaxAvail; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index f79d195..c6524c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -26,32 +26,32 @@ public class CapacityHeadroomProvider { LeafQueue queue; FiCaSchedulerApp application; Resource required; - LeafQueue.QueueHeadroomInfo queueHeadroomInfo; + LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; public CapacityHeadroomProvider( LeafQueue.User user, LeafQueue queue, FiCaSchedulerApp application, Resource required, - LeafQueue.QueueHeadroomInfo queueHeadroomInfo) { + LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { this.user = user; this.queue = queue; this.application = application; this.required = required; - this.queueHeadroomInfo = queueHeadroomInfo; + this.queueResourceLimitsInfo = queueResourceLimitsInfo; } public Resource getHeadroom() { - Resource queueMaxCap; + Resource queueCurrentLimit; Resource clusterResource; - synchronized (queueHeadroomInfo) { - queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); - clusterResource = queueHeadroomInfo.getClusterResource(); + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + clusterResource = queueResourceLimitsInfo.getClusterResource(); } - Resource headroom = queue.getHeadroom(user, queueMaxCap, + Resource headroom = queue.getHeadroom(user, queueCurrentLimit, clusterResource, application, required); // Corner case to deal with applications being slightly over-limit http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6b9d846..28ce264 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -33,7 +34,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -84,12 +85,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -112,11 +117,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; - @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -499,7 +499,8 @@ public class CapacityScheduler extends initializeQueueMappings(); // Re-calculate headroom for active applications - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); labelManager.reinitializeQueueLabels(getQueueToLabels()); setQueueAcls(authorizer, queues); @@ -990,7 +991,8 @@ public class CapacityScheduler extends private synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); } /** @@ -1060,7 +1062,8 @@ public class CapacityScheduler extends LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); CSAssignment assignment = queue.assignContainers(clusterResource, node, - false); + false, new ResourceLimits( + clusterResource)); RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { @@ -1084,7 +1087,8 @@ public class CapacityScheduler extends LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers(clusterResource, node, false); + root.assignContainers(clusterResource, node, false, new ResourceLimits( + clusterResource)); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1205,7 +1209,8 @@ public class CapacityScheduler extends usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); int numNodes = numNodeManagers.incrementAndGet(); updateMaximumAllocation(schedulerNode, true); @@ -1234,7 +1239,8 @@ public class CapacityScheduler extends return; } Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); int numNodes = numNodeManagers.decrementAndGet(); if (scheduleAsynchronously && numNodes == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 38d4712..3910ac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -115,7 +116,10 @@ public class LeafQueue extends AbstractCSQueue { // absolute capacity as a resource (based on cluster resource) private Resource absoluteCapacityResource = Resources.none(); - private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + private final QueueResourceLimitsInfo queueResourceLimitsInfo = + new QueueResourceLimitsInfo(); + + private volatile ResourceLimits currentResourceLimits = null; public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -145,13 +149,14 @@ public class LeafQueue extends AbstractCSQueue { this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); + this.currentResourceLimits = new ResourceLimits(clusterResource); + // Initialize headroom info, also used for calculating application // master resource limits. Since this happens during queue initialization // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -544,12 +549,12 @@ public class LeafQueue extends AbstractCSQueue { * become busy. * */ - Resource queueMaxCap; - synchronized (queueHeadroomInfo) { - queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + Resource queueCurrentLimit; + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); } Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, - absoluteCapacityResource, queueMaxCap); + absoluteCapacityResource, queueCurrentLimit); return Resources.multiplyAndNormalizeUp( resourceCalculator, queueCap, @@ -733,8 +738,10 @@ public class LeafQueue extends AbstractCSQueue { @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, boolean needToUnreserve) { - + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits currentResourceLimits) { + this.currentResourceLimits = currentResourceLimits; + if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " #applications=" + activeApplications.size()); @@ -876,9 +883,9 @@ public class LeafQueue extends AbstractCSQueue { } - private synchronized CSAssignment - assignReservedContainer(FiCaSchedulerApp application, - FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) { + private synchronized CSAssignment assignReservedContainer( + FiCaSchedulerApp application, FiCaSchedulerNode node, + RMContainer rmContainer, Resource clusterResource) { // Do we still need this reservation? Priority priority = rmContainer.getReservedPriority(); if (application.getTotalRequiredResources(priority) == 0) { @@ -895,13 +902,13 @@ public class LeafQueue extends AbstractCSQueue { return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - protected Resource getHeadroom(User user, Resource queueMaxCap, + protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, Resource required) { - return getHeadroom(user, queueMaxCap, clusterResource, + return getHeadroom(user, queueCurrentLimit, clusterResource, computeUserLimit(application, clusterResource, required, user, null)); } - private Resource getHeadroom(User user, Resource queueMaxCap, + private Resource getHeadroom(User user, Resource currentResourceLimit, Resource clusterResource, Resource userLimit) { /** * Headroom is: @@ -923,8 +930,11 @@ public class LeafQueue extends AbstractCSQueue { Resource headroom = Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getUsed()), - Resources.subtract(queueMaxCap, queueUsage.getUsed()) + Resources.subtract(currentResourceLimit, queueUsage.getUsed()) ); + // Normalize it before return + headroom = + Resources.roundDown(resourceCalculator, headroom, minimumAllocation); return headroom; } @@ -1012,23 +1022,17 @@ public class LeafQueue extends AbstractCSQueue { return canAssign; } - private Resource updateHeadroomInfo(Resource clusterResource, - float absoluteMaxAvailCapacity) { - - Resource queueMaxCap = - Resources.multiplyAndNormalizeDown( - resourceCalculator, - clusterResource, - absoluteMaxAvailCapacity, - minimumAllocation); - - synchronized (queueHeadroomInfo) { - queueHeadroomInfo.setQueueMaxCap(queueMaxCap); - queueHeadroomInfo.setClusterResource(clusterResource); - } - - return queueMaxCap; + private Resource computeQueueCurrentLimitAndSetHeadroomInfo( + Resource clusterResource) { + Resource queueCurrentResourceLimit = + getCurrentResourceLimit(clusterResource, currentResourceLimits); + synchronized (queueResourceLimitsInfo) { + queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit); + queueResourceLimitsInfo.setClusterResource(clusterResource); + } + + return queueCurrentResourceLimit; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) @@ -1043,28 +1047,22 @@ public class LeafQueue extends AbstractCSQueue { computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - //Max avail capacity needs to take into account usage by ancestor-siblings - //which are greater than their base capacity, so we are interested in "max avail" - //capacity - float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, this); - - Resource queueMaxCap = - updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); + Resource currentResourceLimit = + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); + getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxCap=" + queueMaxCap + + " queueMaxAvailRes=" + currentResourceLimit + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( - queueUser, this, application, required, queueHeadroomInfo); + queueUser, this, application, required, queueResourceLimitsInfo); application.setHeadroomProvider(headroomProvider); @@ -1249,7 +1247,7 @@ public class LeafQueue extends AbstractCSQueue { application.getResourceRequest(priority, node.getNodeName()); if (nodeLocalResourceRequest != null) { assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1265,8 +1263,8 @@ public class LeafQueue extends AbstractCSQueue { return SKIP_ASSIGNMENT; } - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, needToUnreserve); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -1282,10 +1280,10 @@ public class LeafQueue extends AbstractCSQueue { return SKIP_ASSIGNMENT; } - return new CSAssignment( - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, application, priority, reservedContainer, needToUnreserve), - NodeType.OFF_SWITCH); + return new CSAssignment(assignOffSwitchContainers(clusterResource, + offSwitchResourceRequest, node, application, priority, + reservedContainer, needToUnreserve), + NodeType.OFF_SWITCH); } return SKIP_ASSIGNMENT; @@ -1373,7 +1371,7 @@ public class LeafQueue extends AbstractCSQueue { ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { - if (canAssign(application, priority, node, NodeType.NODE_LOCAL, + if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, @@ -1383,9 +1381,9 @@ public class LeafQueue extends AbstractCSQueue { return Resources.none(); } - private Resource assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + private Resource assignRackLocalContainers(Resource clusterResource, + ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { @@ -1397,9 +1395,9 @@ public class LeafQueue extends AbstractCSQueue { return Resources.none(); } - private Resource assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, - FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + private Resource assignOffSwitchContainers(Resource clusterResource, + ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { @@ -1753,15 +1751,16 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource, + ResourceLimits currentResourceLimits) { + this.currentResourceLimits = currentResourceLimits; lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -1951,16 +1950,16 @@ public class LeafQueue extends AbstractCSQueue { * Holds shared values used by all applications in * the queue to calculate headroom on demand */ - static class QueueHeadroomInfo { - private Resource queueMaxCap; + static class QueueResourceLimitsInfo { + private Resource queueCurrentLimit; private Resource clusterResource; - public void setQueueMaxCap(Resource queueMaxCap) { - this.queueMaxCap = queueMaxCap; + public void setQueueCurrentLimit(Resource currentLimit) { + this.queueCurrentLimit = currentLimit; } - public Resource getQueueMaxCap() { - return queueMaxCap; + public Resource getQueueCurrentLimit() { + return queueCurrentLimit; } public void setClusterResource(Resource clusterResource) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a26b0aa..7feaa15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -378,8 +379,9 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers( - Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) { + public synchronized CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits resourceLimits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); Set nodeLabels = node.getLabels(); @@ -408,7 +410,8 @@ public class ParentQueue extends AbstractCSQueue { // Schedule CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); + assignContainersToChildQueues(clusterResource, node, + localNeedToUnreserve | needToUnreserve, resourceLimits); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -530,8 +533,29 @@ public class ParentQueue extends AbstractCSQueue { node.getAvailableResource(), minimumAllocation); } - private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, - FiCaSchedulerNode node, boolean needToUnreserve) { + private ResourceLimits getResourceLimitsOfChild(CSQueue child, + Resource clusterResource, ResourceLimits myLimits) { + /* + * Set head-room of a given child, limit = + * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used + * + child.used. To avoid any of this queue's and its ancestors' limit + * being violated + */ + Resource myCurrentLimit = + getCurrentResourceLimit(clusterResource, myLimits); + // My available resource = my-current-limit - my-used-resource + Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit, + getUsedResources()); + // Child's limit = my-available-resource + resource-already-used-by-child + Resource childLimit = + Resources.add(myMaxAvailableResource, child.getUsedResources()); + + return new ResourceLimits(childLimit); + } + + private synchronized CSAssignment assignContainersToChildQueues( + Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve, + ResourceLimits limits) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -544,7 +568,14 @@ public class ParentQueue extends AbstractCSQueue { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assignment = childQueue.assignContainers(cluster, node, needToUnreserve); + + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = + getResourceLimitsOfChild(childQueue, cluster, limits); + + assignment = + childQueue.assignContainers(cluster, node, needToUnreserve, + childLimits); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -638,10 +669,14 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource, + ResourceLimits resourceLimits) { // Update all children for (CSQueue childQueue : childQueues) { - childQueue.updateClusterResource(clusterResource); + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = + getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); + childQueue.updateClusterResource(clusterResource, childLimits); } // Update metrics @@ -728,4 +763,4 @@ public class ParentQueue extends AbstractCSQueue { public synchronized int getNumApplications() { return numApplications; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index e1b8a3d..494f5a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,14 +23,12 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import org.junit.Assert; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -53,6 +52,7 @@ public class MockAM { private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; + private volatile AllocateResponse lastResponse; private final List requests = new ArrayList(); private final List releases = new ArrayList(); @@ -223,7 +223,8 @@ public class MockAM { context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - return doAllocateAs(ugi, allocateRequest); + lastResponse = doAllocateAs(ugi, allocateRequest); + return lastResponse; } public AllocateResponse doAllocateAs(UserGroupInformation ugi, @@ -240,6 +241,10 @@ public class MockAM { throw (Exception) e.getCause(); } } + + public AllocateResponse doHeartbeat() throws Exception { + return allocate(null, null); + } public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java index b6dfacb..f0bf892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java @@ -38,7 +38,7 @@ public class TestResourceUsage { @Parameterized.Parameters public static Collection getParameters() { return Arrays.asList(new String[][] { { "Pending" }, { "Used" }, - { "Headroom" }, { "Reserved" }, { "AMUsed" } }); + { "Reserved" }, { "AMUsed" } }); } public TestResourceUsage(String suffix) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 81a5aad..8cad057 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -21,15 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.mockito.Matchers; -import org.mockito.Mockito; import java.io.IOException; import java.util.ArrayList; @@ -42,8 +37,8 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; @@ -53,9 +48,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -63,7 +59,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.Ignore; +import org.mockito.Matchers; +import org.mockito.Mockito; public class TestApplicationLimits { @@ -171,7 +168,9 @@ public class TestApplicationLimits { // am limit is 4G initially (based on the queue absolute capacity) // when there is only 1 user, and drops to 2G (the userlimit) when there // is a second user - queue.updateClusterResource(Resource.newInstance(80 * GB, 40)); + Resource clusterResource = Resource.newInstance(80 * GB, 40); + queue.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); @@ -289,7 +288,8 @@ public class TestApplicationLimits { // Add some nodes to the cluster & test new limits clusterResource = Resources.createResource(120 * 16 * GB); - root.updateClusterResource(clusterResource); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1)); assertEquals(queue.getUserAMResourceLimit(), @@ -611,7 +611,8 @@ public class TestApplicationLimits { app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -630,7 +631,8 @@ public class TestApplicationLimits { app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change @@ -649,7 +651,8 @@ public class TestApplicationLimits { app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); @@ -657,7 +660,8 @@ public class TestApplicationLimits { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); - queue.assignContainers(clusterResource, node_0, false); // Schedule to compute + queue.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java deleted file mode 100644 index 5135ba9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Test; - -public class TestCSQueueUtils { - - private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class); - - final static int GB = 1024; - - @Test - public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception { - runInvalidDivisorTest(false); - runInvalidDivisorTest(true); - } - - public void runInvalidDivisorTest(boolean useDominant) throws Exception { - - ResourceCalculator resourceCalculator; - Resource clusterResource; - if (useDominant) { - resourceCalculator = new DominantResourceCalculator(); - clusterResource = Resources.createResource(10, 0); - } else { - resourceCalculator = new DefaultResourceCalculator(); - clusterResource = Resources.createResource(0, 99); - } - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(0, 0)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root), 0.000001f); - - assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - final String L1Q2 = "L1Q2"; - final String L2Q1 = "L2Q1"; - final String L2Q2 = "L2Q2"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2, - L2Q1, L2Q2}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 80); - csConf.setMaximumCapacity(L1Q1P, 80); - - final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2; - csConf.setCapacity(L1Q2P, 20); - csConf.setMaximumCapacity(L1Q2P, 100); - - final String L2Q1P = L1Q1P + "." + L2Q1; - csConf.setCapacity(L2Q1P, 50); - csConf.setMaximumCapacity(L2Q1P, 50); - - final String L2Q2P = L1Q1P + "." + L2Q2; - csConf.setCapacity(L2Q2P, 50); - csConf.setMaximumCapacity(L2Q2P, 50); - - float result; - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null); - LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null); - LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null); - - //no usage, all based on maxCapacity (prior behavior) - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //some usage, but below the base capacity - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //usage gt base on parent sibling - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //same as last, but with usage also on direct parent - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, below the threshold of effect at present - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, now above the threshold of effect - //(it's cumulative with prior tests) - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.1f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fabf47d..83ab104 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -359,7 +360,8 @@ public class TestCapacityScheduler { resourceManager.getResourceScheduler().handle(nodeUpdate); } - private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); @@ -383,6 +385,7 @@ public class TestCapacityScheduler { conf.setUserLimitFactor(B3, 100.0f); LOG.info("Setup top-level queues a and b"); + return conf; } @Test @@ -2400,6 +2403,86 @@ public class TestCapacityScheduler { assertEquals("queue B2 max vcores allocation", 12, ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); } + + private void waitContainerAllocated(MockAM am, int mem, int nContainer, + int startContainerId, MockRM rm, MockNM nm) throws Exception { + for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) { + am.allocate("*", mem, 1, new ArrayList()); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), cId); + Assert.assertTrue(rm.waitForState(nm, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + } + } + + @Test + public void testHierarchyQueuesCurrentLimits() throws Exception { + /* + * Queue tree: + * Root + * / \ + * A B + * / \ / | \ + * A1 A2 B1 B2 B3 + */ + YarnConfiguration conf = + new YarnConfiguration( + setupQueueConfiguration(new CapacitySchedulerConfiguration())); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1); + + // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB + // 2 GBs used by am, so it's 71 - 2 = 69G. + Assert.assertEquals(69 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total) + waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1); + + // Allocated one more container with 1 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 3 GB (2 * 1GB containers and 1 AM container) + // Available is 100 - 41 - 3 = 56 GB + Assert.assertEquals(56 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom + // of app1 (in queue b1) updated correctly + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total) + waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1); + + // Allocated one more container with 4 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 4 GB (3 * 1GB containers and 1 AM container) + // A1 uses 25 GB (3 * 8GB containers and 1 AM container) + // Available is 100 - 41 - 4 - 25 = 30 GB + Assert.assertEquals(30 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + } private void setMaxAllocMb(Configuration conf, int maxAllocMb) { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index af58a43..7edb17d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -143,7 +144,9 @@ public class TestChildQueueOrder { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); + when(queue) + .assignContainers(eq(clusterResource), eq(node), anyBoolean(), + any(ResourceLimits.class)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -154,7 +157,8 @@ public class TestChildQueueOrder { return new CSAssignment(allocatedResource, type); } }). - when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean()); + when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), + any(ResourceLimits.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -270,14 +274,16 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); for(int i=0; i < 2; i++) { stubQueueAllocation(a, clusterResource, node_0, 0*GB); stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } for(int i=0; i < 3; i++) { @@ -285,7 +291,8 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } for(int i=0; i < 4; i++) { @@ -293,7 +300,8 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -326,7 +334,8 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); } verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -353,7 +362,8 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -379,7 +389,8 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 0*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -393,12 +404,13 @@ public class TestChildQueueOrder { stubQueueAllocation(b, clusterResource, node_0, 1*GB); stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); - root.assignContainers(clusterResource, node_0, false); + root.assignContainers(clusterResource, node_0, false, new ResourceLimits( + clusterResource)); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyBoolean()); + any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource);