Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A127200D7F for ; Wed, 13 Dec 2017 00:58:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 28A09160C27; Tue, 12 Dec 2017 23:58:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B4AE9160C24 for ; Wed, 13 Dec 2017 00:58:47 +0100 (CET) Received: (qmail 23691 invoked by uid 500); 12 Dec 2017 23:58:39 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 23315 invoked by uid 99); 12 Dec 2017 23:58:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Dec 2017 23:58:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D661EEE45; Tue, 12 Dec 2017 23:58:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xyao@apache.org To: common-commits@hadoop.apache.org Date: Tue, 12 Dec 2017 23:59:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/50] hadoop git commit: YARN-7473. Implement Framework and policy for capacity management of auto created queues. (Suma Shivaprasad via wangda) archived-at: Tue, 12 Dec 2017 23:58:50 -0000 YARN-7473. Implement Framework and policy for capacity management of auto created queues. (Suma Shivaprasad via wangda) Change-Id: Icca7805fe12f6f7fb335effff4b121b6f7f6337b Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b38643c9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b38643c9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b38643c9 Branch: refs/heads/HDFS-7240 Commit: b38643c9a8dd2c53024ae830b9565a550d0ec39c Parents: 74665e3 Author: Wangda Tan Authored: Fri Dec 8 15:10:16 2017 -0800 Committer: Wangda Tan Committed: Fri Dec 8 15:10:16 2017 -0800 ---------------------------------------------------------------------- .../monitor/SchedulingMonitor.java | 4 +- .../ProportionalCapacityPreemptionPolicy.java | 7 +- .../monitor/capacity/TempQueuePerPartition.java | 8 + .../CapacitySchedulerPlanFollower.java | 12 +- .../scheduler/AbstractYarnScheduler.java | 2 +- .../scheduler/YarnScheduler.java | 2 +- .../capacity/AbstractAutoCreatedLeafQueue.java | 113 +++ .../scheduler/capacity/AbstractCSQueue.java | 101 ++- .../capacity/AbstractManagedParentQueue.java | 162 ++-- .../capacity/AutoCreatedLeafQueue.java | 136 ++-- .../capacity/AutoCreatedLeafQueueConfig.java | 66 ++ .../AutoCreatedQueueManagementPolicy.java | 64 ++ .../scheduler/capacity/CSQueue.java | 2 +- .../scheduler/capacity/CSQueueUtils.java | 33 +- .../scheduler/capacity/CapacityScheduler.java | 104 +-- .../CapacitySchedulerConfiguration.java | 112 ++- .../capacity/CapacitySchedulerContext.java | 8 + .../capacity/CapacitySchedulerQueueManager.java | 6 +- .../scheduler/capacity/LeafQueue.java | 75 +- .../scheduler/capacity/ManagedParentQueue.java | 294 +++++++- .../scheduler/capacity/PlanQueue.java | 150 +++- .../capacity/QueueManagementChange.java | 148 ++++ .../QueueManagementDynamicEditPolicy.java | 272 +++++++ .../scheduler/capacity/ReservationQueue.java | 91 +++ .../GuaranteedOrZeroCapacityOverTimePolicy.java | 745 +++++++++++++++++++ .../scheduler/common/QueueEntitlement.java | 22 + .../event/QueueManagementChangeEvent.java | 49 ++ .../scheduler/event/SchedulerEventType.java | 5 +- .../capacity/TestAutoCreatedLeafQueue.java | 113 --- ...stCapacitySchedulerAutoCreatedQueueBase.java | 579 ++++++++++++++ .../TestCapacitySchedulerAutoQueueCreation.java | 611 ++++++--------- .../TestCapacitySchedulerDynamicBehavior.java | 32 +- ...tGuaranteedOrZeroCapacityOverTimePolicy.java | 40 + .../scheduler/capacity/TestLeafQueue.java | 125 +++- .../TestQueueManagementDynamicEditPolicy.java | 121 +++ .../capacity/TestReservationQueue.java | 114 +++ 36 files changed, 3629 insertions(+), 899 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 09edb98..d1cc850 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -79,7 +79,7 @@ public class SchedulingMonitor extends AbstractService { } private void schedulePreemptionChecker() { - handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + handler = ses.scheduleAtFixedRate(new PolicyInvoker(), 0, monitorInterval, TimeUnit.MILLISECONDS); } @@ -99,7 +99,7 @@ public class SchedulingMonitor extends AbstractService { scheduleEditPolicy.editSchedule(); } - private class PreemptionChecker implements Runnable { + private class PolicyInvoker implements Runnable { @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 8327cb9..304d204 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuot import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; @@ -377,7 +380,9 @@ public class ProportionalCapacityPreemptionPolicy } private Set getLeafQueueNames(TempQueuePerPartition q) { - if (q.children == null || q.children.isEmpty()) { + // If its a ManagedParentQueue, it might not have any children + if ((q.children == null || q.children.isEmpty()) + && !(q.parentQueue instanceof ManagedParentQueue)) { return ImmutableSet.of(q.queueName); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4d71223..fdeee52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -56,6 +59,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { final ArrayList children; private Collection apps; LeafQueue leafQueue; + ParentQueue parentQueue; boolean preemptionDisabled; protected Resource pendingDeductReserved; @@ -90,6 +94,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { pendingDeductReserved = Resources.createResource(0); } + if (ParentQueue.class.isAssignableFrom(queue.getClass())) { + parentQueue = (ParentQueue) queue; + } + this.normalizedGuarantee = new double[ResourceUtils .getNumberOfKnownResourceTypes()]; this.children = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 2e16689..7962d8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -28,10 +28,12 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ReservationQueue; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -92,8 +94,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower String planQueueName, Queue queue, String currResId) { PlanQueue planQueue = (PlanQueue)queue; try { - AutoCreatedLeafQueue resQueue = - new AutoCreatedLeafQueue(cs, currResId, planQueue); + ReservationQueue resQueue = + new ReservationQueue(cs, currResId, planQueue); cs.addQueue(resQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( @@ -112,8 +114,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower PlanQueue planQueue = (PlanQueue)queue; if (cs.getQueue(defReservationId) == null) { try { - AutoCreatedLeafQueue defQueue = - new AutoCreatedLeafQueue(cs, defReservationId, planQueue); + ReservationQueue defQueue = + new ReservationQueue(cs, defReservationId, planQueue); cs.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d94efb1..cf5e13b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -457,7 +457,7 @@ public abstract class AbstractYarnScheduler } @Override - public void addQueue(Queue newQueue) throws YarnException { + public void addQueue(Queue newQueue) throws YarnException, IOException { throw new YarnException(getClass().getSimpleName() + " does not support this operation"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 111998b..93ca7c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -272,7 +272,7 @@ public interface YarnScheduler extends EventHandler { * @param newQueue the queue being added. * @throws YarnException */ - void addQueue(Queue newQueue) throws YarnException; + void addQueue(Queue newQueue) throws YarnException, IOException; /** * This method increase the entitlement for current queue (must respect http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java new file mode 100644 index 0000000..ac97d72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; + +/** + * Abstract class for dynamic auto created queues managed by an implementation + * of AbstractManagedParentQueue + */ +public class AbstractAutoCreatedLeafQueue extends LeafQueue { + + protected AbstractManagedParentQueue parent; + + public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs, + String queueName, AbstractManagedParentQueue parent, CSQueue old) + throws IOException { + super(cs, queueName, parent, old); + this.parent = parent; + } + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractAutoCreatedLeafQueue.class); + + public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs, + CapacitySchedulerConfiguration leafQueueConfigs, String queueName, + AbstractManagedParentQueue parent, CSQueue old) throws IOException { + super(cs, leafQueueConfigs, queueName, parent, old); + this.parent = parent; + } + + /** + * This methods to change capacity for a queue and adjusts its + * absoluteCapacity + * + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity, etc..) + * @throws SchedulerDynamicEditException + */ + public void setEntitlement(QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + setEntitlement(NO_LABEL, entitlement); + } + + /** + * This methods to change capacity for a queue and adjusts its + * absoluteCapacity + * + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity, etc..) + * @throws SchedulerDynamicEditException + */ + public void setEntitlement(String nodeLabel, QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(nodeLabel, capacity); + setAbsoluteCapacity(nodeLabel, + getParent().getQueueCapacities(). + getAbsoluteCapacity(nodeLabel) + * getQueueCapacities().getCapacity(nodeLabel)); + // note: we currently set maxCapacity to capacity + // this might be revised later + setMaxCapacity(nodeLabel, entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + this + .getQueueName()); + } + + //update queue used capacity etc + CSQueueUtils.updateQueueStatistics(resourceCalculator, + csContext.getClusterResource(), + this, labelManager, nodeLabel); + } finally { + writeLock.unlock(); + } + } + + protected void setupConfigurableCapacities(QueueCapacities queueCapacities) { + CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), + queueCapacities, parent == null ? null : parent.getQueueCapacities()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 140ea5d..4df4cf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -128,27 +128,34 @@ public abstract class AbstractCSQueue implements CSQueue { public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { + this(cs, cs.getConfiguration(), queueName, parent, old); + } + + public AbstractCSQueue(CapacitySchedulerContext cs, + CapacitySchedulerConfiguration configuration, String queueName, + CSQueue parent, CSQueue old) { + this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; - this.queuePath = - ((parent == null) ? "" : (parent.getQueuePath() + ".")) + this.queueName; + this.queuePath = ((parent == null) ? "" : (parent.getQueuePath() + ".")) + + this.queueName; this.resourceCalculator = cs.getResourceCalculator(); this.activitiesManager = cs.getActivitiesManager(); - + // must be called after parent and queueName is set - this.metrics = - old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics - .forQueue(getQueuePath(), parent, cs.getConfiguration() - .getEnableUserMetrics(), cs.getConf()); + this.metrics = old != null ? + (CSQueueMetrics) old.getMetrics() : + CSQueueMetrics.forQueue(getQueuePath(), parent, + configuration.getEnableUserMetrics(), cs.getConf()); this.csContext = cs; this.minimumAllocation = csContext.getMinimumResourceCapability(); - + // initialize ResourceUsage queueUsage = new ResourceUsage(); queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); - + // initialize QueueCapacities queueCapacities = new QueueCapacities(parent == null); @@ -159,11 +166,16 @@ public abstract class AbstractCSQueue implements CSQueue { readLock = lock.readLock(); writeLock = lock.writeLock(); } - + protected void setupConfigurableCapacities() { + setupConfigurableCapacities(csContext.getConfiguration()); + } + + protected void setupConfigurableCapacities( + CapacitySchedulerConfiguration configuration) { CSQueueUtils.loadUpdateAndCheckCapacities( getQueuePath(), - csContext.getConfiguration(), + configuration, queueCapacities, parent == null ? null : parent.getQueueCapacities()); } @@ -275,6 +287,29 @@ public abstract class AbstractCSQueue implements CSQueue { } } + /** + * Set maximum capacity + * @param maximumCapacity new max capacity + */ + void setMaxCapacity(String nodeLabel, float maximumCapacity) { + try { + writeLock.lock(); + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), + queueCapacities.getCapacity(nodeLabel), maximumCapacity); + float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, parent); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), + queueCapacities.getAbsoluteCapacity(nodeLabel), absMaxCapacity); + + queueCapacities.setMaximumCapacity(maximumCapacity); + queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); + } finally { + writeLock.unlock(); + } + } + + @Override public String getDefaultNodeLabelExpression() { return defaultLabelExpression; @@ -282,13 +317,20 @@ public abstract class AbstractCSQueue implements CSQueue { void setupQueueConfigs(Resource clusterResource) throws IOException { + setupQueueConfigs(clusterResource, csContext.getConfiguration()); + } + + protected void setupQueueConfigs(Resource clusterResource, + CapacitySchedulerConfiguration configuration) throws + IOException { + try { writeLock.lock(); // get labels this.accessibleLabels = - csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + configuration.getAccessibleNodeLabels(getQueuePath()); this.defaultLabelExpression = - csContext.getConfiguration().getDefaultNodeLabelExpression( + configuration.getDefaultNodeLabelExpression( getQueuePath()); this.resourceTypes = new HashSet(); for (AbsoluteResourceType type : AbsoluteResourceType.values()) { @@ -308,7 +350,7 @@ public abstract class AbstractCSQueue implements CSQueue { } // After we setup labels, we can setup capacities - setupConfigurableCapacities(); + setupConfigurableCapacities(configuration); // Also fetch minimum/maximum resource constraint for this queue if // configured. @@ -316,20 +358,20 @@ public abstract class AbstractCSQueue implements CSQueue { updateConfigurableResourceRequirement(getQueuePath(), clusterResource); this.maximumAllocation = - csContext.getConfiguration().getMaximumAllocationPerQueue( + configuration.getMaximumAllocationPerQueue( getQueuePath()); // initialized the queue state based on previous state, configured state // and its parent state. QueueState previous = getState(); - QueueState configuredState = csContext.getConfiguration() + QueueState configuredState = configuration .getConfiguredState(getQueuePath()); QueueState parentState = (parent == null) ? null : parent.getState(); initializeQueueState(previous, configuredState, parentState); authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - this.acls = csContext.getConfiguration().getAcls(getQueuePath()); + this.acls = configuration.getAcls(getQueuePath()); // Update metrics CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, @@ -361,18 +403,21 @@ public abstract class AbstractCSQueue implements CSQueue { this.reservationsContinueLooking = csContext.getConfiguration().getReservationContinueLook(); - this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this, + configuration); - this.priority = csContext.getConfiguration().getQueuePriority( + this.priority = configuration.getQueuePriority( getQueuePath()); - this.userWeights = getUserWeightsFromHierarchy(); + this.userWeights = getUserWeightsFromHierarchy(configuration); } finally { writeLock.unlock(); } } - private Map getUserWeightsFromHierarchy() throws IOException { + private Map getUserWeightsFromHierarchy + (CapacitySchedulerConfiguration configuration) throws + IOException { Map unionInheritedWeights = new HashMap(); CSQueue parentQ = getParent(); if (parentQ != null) { @@ -381,9 +426,8 @@ public abstract class AbstractCSQueue implements CSQueue { } // Insert this queue's user's weights, overriding parent's user's weights if // there is overlap. - CapacitySchedulerConfiguration csConf = csContext.getConfiguration(); unionInheritedWeights.putAll( - csConf.getAllUserWeightsForQueue(getQueuePath())); + configuration.getAllUserWeightsForQueue(getQueuePath())); return unionInheritedWeights; } @@ -720,10 +764,11 @@ public abstract class AbstractCSQueue implements CSQueue { * * @return true if queue has preemption disabled, false otherwise */ - private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) { - CapacitySchedulerConfiguration csConf = csContext.getConfiguration(); + private boolean isQueueHierarchyPreemptionDisabled(CSQueue q, + CapacitySchedulerConfiguration configuration) { boolean systemWidePreemption = - csConf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + csContext.getConfiguration() + .getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); CSQueue parentQ = q.getParent(); @@ -735,14 +780,14 @@ public abstract class AbstractCSQueue implements CSQueue { // on, then q does not have preemption disabled (default=false, below) // unless the preemption_disabled property is explicitly set. if (parentQ == null) { - return csConf.getPreemptionDisabled(q.getQueuePath(), false); + return configuration.getPreemptionDisabled(q.getQueuePath(), false); } // If this is not the root queue, inherit the default value for the // preemption_disabled property from the parent. Preemptability will be // inherited from the parent's hierarchy unless explicitly overridden at // this level. - return csConf.getPreemptionDisabled(q.getQueuePath(), + return configuration.getPreemptionDisabled(q.getQueuePath(), parentQ.getPreemptionDisabled()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 46f5cf1..9d38f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; /** * A container class for automatically created child leaf queues. @@ -35,13 +43,12 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { private static final Logger LOG = LoggerFactory.getLogger( AbstractManagedParentQueue.class); - protected AutoCreatedLeafQueueTemplate leafQueueTemplate; + protected AutoCreatedLeafQueueConfig leafQueueTemplate; + protected AutoCreatedQueueManagementPolicy queueManagementPolicy = null; public AbstractManagedParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - - super.setupQueueConfigs(csContext.getClusterResource()); } @Override @@ -53,52 +60,18 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { // Set new configs setupQueueConfigs(clusterResource); - // run reinitialize on each existing queue, to trigger absolute cap - // recomputations - for (CSQueue res : this.getChildQueues()) { - res.reinitialize(res, clusterResource); - } } finally { writeLock.unlock(); } } /** - * Initialize leaf queue configs from template configurations specified on - * parent queue. - */ - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs - (String queuePath) { - - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new - AutoCreatedLeafQueueTemplate.Builder(); - int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); - if (maxApps < 0) { - maxApps = (int) ( - CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS - * getAbsoluteCapacity()); - } - - int userLimit = conf.getUserLimit(queuePath); - float userLimitFactor = conf.getUserLimitFactor(queuePath); - leafQueueTemplateBuilder.userLimit(userLimit) - .userLimitFactor(userLimitFactor) - .maxApps(maxApps) - .maxAppsPerUser( - (int) (maxApps * (userLimit / 100.0f) * userLimitFactor)); - - return leafQueueTemplateBuilder; - } - - /** * Add the specified child queue. * @param childQueue reference to the child queue to be added * @throws SchedulerDynamicEditException */ public void addChildQueue(CSQueue childQueue) - throws SchedulerDynamicEditException { + throws SchedulerDynamicEditException, IOException { try { writeLock.lock(); if (childQueue.getCapacity() > 0) { @@ -193,84 +166,69 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { } } - public static class AutoCreatedLeafQueueTemplate { - - private QueueCapacities queueCapacities; - - private int maxApps; - private int maxAppsPerUser; - private int userLimit; - private float userLimitFactor; - - AutoCreatedLeafQueueTemplate(Builder builder) { - this.maxApps = builder.maxApps; - this.maxAppsPerUser = builder.maxAppsPerUser; - this.userLimit = builder.userLimit; - this.userLimitFactor = builder.userLimitFactor; - this.queueCapacities = builder.queueCapacities; - } - - public static class Builder { - private int maxApps; - private int maxAppsPerUser; + public AutoCreatedLeafQueueConfig getLeafQueueTemplate() { + return leafQueueTemplate; + } - private int userLimit; - private float userLimitFactor; + public AutoCreatedQueueManagementPolicy + getAutoCreatedQueueManagementPolicy() { + return queueManagementPolicy; + } - private QueueCapacities queueCapacities; + protected SortedMap getConfigurationsWithPrefix + (SortedMap sortedConfigs, String prefix) { + return sortedConfigs.subMap( prefix, prefix + Character.MAX_VALUE ); + } - Builder maxApps(int maxApplications) { - this.maxApps = maxApplications; - return this; - } + protected SortedMap sortCSConfigurations() { + SortedMap sortedConfigs = new TreeMap( + new Comparator() { + public int compare(String s1, String s2) { + return s1.compareToIgnoreCase(s2); + } - Builder maxAppsPerUser(int maxApplicationsPerUser) { - this.maxAppsPerUser = maxApplicationsPerUser; - return this; - } + }); - Builder userLimit(int usrLimit) { - this.userLimit = usrLimit; - return this; - } + for (final Iterator> iterator = + csContext.getConfiguration().iterator(); iterator.hasNext(); ) { + final Map.Entry confKeyValuePair = iterator.next(); + sortedConfigs.put(confKeyValuePair.getKey(), confKeyValuePair.getValue()); + } + return sortedConfigs; + } - Builder userLimitFactor(float ulf) { - this.userLimitFactor = ulf; - return this; - } + protected CapacitySchedulerConfiguration initializeLeafQueueConfigs(String + configPrefix) { - Builder capacities(QueueCapacities capacities) { - this.queueCapacities = capacities; - return this; - } + CapacitySchedulerConfiguration leafQueueConfigs = new + CapacitySchedulerConfiguration(new Configuration(false), false); - AutoCreatedLeafQueueTemplate build() { - return new AutoCreatedLeafQueueTemplate(this); - } - } + SortedMap sortedConfigs = sortCSConfigurations(); + SortedMap templateConfigs = getConfigurationsWithPrefix + (sortedConfigs, configPrefix); - public int getUserLimit() { - return userLimit; + for (final Iterator> iterator = + templateConfigs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry confKeyValuePair = iterator.next(); + leafQueueConfigs.set(confKeyValuePair.getKey(), + confKeyValuePair.getValue()); } - public float getUserLimitFactor() { - return userLimitFactor; - } + return leafQueueConfigs; + } - public QueueCapacities getQueueCapacities() { - return queueCapacities; - } + protected void validateQueueEntitlementChange(AbstractAutoCreatedLeafQueue + leafQueue, QueueEntitlement entitlement) + throws SchedulerDynamicEditException { - public int getMaxApps() { - return maxApps; - } + float sumChilds = sumOfChildCapacities(); + float newChildCap = + sumChilds - leafQueue.getCapacity() + entitlement.getCapacity(); - public int getMaxAppsPerUser() { - return maxAppsPerUser; + if (!(newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON)) { + throw new SchedulerDynamicEditException( + "Sum of child queues should exceed 100% for auto creating parent " + + "queue : " + queueName); } } - - public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() { - return leafQueueTemplate; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index bc206d4..1d796ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -21,36 +21,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** - * Leaf queues which are auto created by an underkying implementation of + * Leaf queues which are auto created by an underlying implementation of * AbstractManagedParentQueue. Eg: PlanQueue for reservations or * ManagedParentQueue for auto created dynamic queues */ -public class AutoCreatedLeafQueue extends LeafQueue { +public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue { private static final Logger LOG = LoggerFactory .getLogger(AutoCreatedLeafQueue.class); - private AbstractManagedParentQueue parent; - public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, - AbstractManagedParentQueue parent) throws IOException { - super(cs, queueName, parent, null); - - AutoCreatedLeafQueueTemplate leafQueueTemplate = - parent.getLeafQueueTemplate(); - updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), - leafQueueTemplate.getUserLimitFactor(), - leafQueueTemplate.getMaxApps(), - leafQueueTemplate.getMaxAppsPerUser()); - this.parent = parent; + ManagedParentQueue parent) throws IOException { + super(cs, parent.getLeafQueueConfigs(queueName), + queueName, + parent, null); + updateCapacitiesToZero(); } @Override @@ -61,48 +52,75 @@ public class AutoCreatedLeafQueue extends LeafQueue { validate(newlyParsedQueue); - super.reinitialize(newlyParsedQueue, clusterResource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); + ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent; - AutoCreatedLeafQueueTemplate leafQueueTemplate = - parent.getLeafQueueTemplate(); - updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), - leafQueueTemplate.getUserLimitFactor(), - leafQueueTemplate.getMaxApps(), - leafQueueTemplate.getMaxAppsPerUser()); + super.reinitialize(newlyParsedQueue, clusterResource, managedParentQueue + .getLeafQueueConfigs(newlyParsedQueue.getQueueName())); + + //Reset capacities to 0 since reinitialize above + // queueCapacities to initialize to configured capacity which might + // overcommit resources from parent queue + updateCapacitiesToZero(); } finally { writeLock.unlock(); } } - /** - * This methods to change capacity for a queue and adjusts its - * absoluteCapacity. - * - * @param entitlement the new entitlement for the queue (capacity, - * maxCapacity) - * @throws SchedulerDynamicEditException - */ - public void setEntitlement(QueueEntitlement entitlement) - throws SchedulerDynamicEditException { + public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig + leafQueueTemplate) throws SchedulerDynamicEditException, IOException { + try { writeLock.lock(); - float capacity = entitlement.getCapacity(); + + // TODO: + // reinitialize only capacities for now since 0 capacity updates + // can cause + // abs capacity related config computations to be incorrect if we go + // through reinitialize + QueueCapacities capacities = leafQueueTemplate.getQueueCapacities(); + + //update abs capacities + setupConfigurableCapacities(capacities); + + //reset capacities for the leaf queue + mergeCapacities(capacities); + + //update queue used capacity for all the node labels + CSQueueUtils.updateQueueStatistics(resourceCalculator, + csContext.getClusterResource(), + this, labelManager, null); + + //activate applications if any are pending + activateApplications(); + + } finally { + writeLock.unlock(); + } + } + + private void mergeCapacities(QueueCapacities capacities) { + for ( String nodeLabel : capacities.getExistingNodeLabels()) { + this.queueCapacities.setCapacity(nodeLabel, + capacities.getCapacity(nodeLabel)); + this.queueCapacities.setAbsoluteCapacity(nodeLabel, capacities + .getAbsoluteCapacity(nodeLabel)); + this.queueCapacities.setMaximumCapacity(nodeLabel, capacities + .getMaximumCapacity(nodeLabel)); + this.queueCapacities.setAbsoluteMaximumCapacity(nodeLabel, capacities + .getAbsoluteMaximumCapacity(nodeLabel)); + } + } + + public void validateConfigurations(AutoCreatedLeafQueueConfig template) + throws SchedulerDynamicEditException { + QueueCapacities capacities = template.getQueueCapacities(); + for (String label : capacities.getExistingNodeLabels()) { + float capacity = capacities.getCapacity(label); if (capacity < 0 || capacity > 1.0f) { throw new SchedulerDynamicEditException( "Capacity demand is not in the [0,1] range: " + capacity); } - setCapacity(capacity); - setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - setMaxCapacity(entitlement.getMaxCapacity()); - if (LOG.isDebugEnabled()) { - LOG.debug("successfully changed to " + capacity + " for queue " + this - .getQueueName()); - } - } finally { - writeLock.unlock(); } } @@ -113,22 +131,20 @@ public class AutoCreatedLeafQueue extends LeafQueue { "Error trying to reinitialize " + getQueuePath() + " from " + newlyParsedQueue.getQueuePath()); } - - } - - @Override - protected void setupConfigurableCapacities() { - CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), - queueCapacities, parent == null ? null : parent.getQueueCapacities()); } - private void updateApplicationAndUserLimits(int userLimit, - float userLimitFactor, - int maxAppsForAutoCreatedQueues, - int maxAppsPerUserForAutoCreatedQueues) { - setUserLimit(userLimit); - setUserLimitFactor(userLimitFactor); - setMaxApplications(maxAppsForAutoCreatedQueues); - setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues); + private void updateCapacitiesToZero() throws IOException { + try { + for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels + ()) { + //TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574 + setEntitlement(nodeLabel, new QueueEntitlement(0.0f, + parent.getLeafQueueTemplate() + .getQueueCapacities() + .getMaximumCapacity())); + } + } catch (SchedulerDynamicEditException e) { + throw new IOException(e); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java new file mode 100644 index 0000000..5952250 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +/** + * Auto Created Leaf queue configurations, capacity + */ +public class AutoCreatedLeafQueueConfig { + + /** + * Template queue capacities - contains configured and derived capacities + * like abs capacity which are used by auto queue creation policy to manage + * leaf queue capacities + */ + private QueueCapacities queueCapacities; + + private CapacitySchedulerConfiguration leafQueueConfigs; + + public AutoCreatedLeafQueueConfig(Builder builder) { + this.queueCapacities = builder.queueCapacities; + this.leafQueueConfigs = builder.leafQueueConfigs; + } + + public static class Builder { + + private QueueCapacities queueCapacities; + private CapacitySchedulerConfiguration leafQueueConfigs; + + public Builder capacities(QueueCapacities capacities) { + this.queueCapacities = capacities; + return this; + } + + public Builder configuration(CapacitySchedulerConfiguration conf) { + this.leafQueueConfigs = conf; + return this; + } + + public AutoCreatedLeafQueueConfig build() { + return new AutoCreatedLeafQueueConfig(this); + } + } + + public QueueCapacities getQueueCapacities() { + return queueCapacities; + } + + public CapacitySchedulerConfiguration getLeafQueueConfigs() { + return leafQueueConfigs; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java new file mode 100644 index 0000000..f7a4bbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import java.util.List; + +public interface AutoCreatedQueueManagementPolicy { + + /** + * Initialize policy + * @param schedulerContext Capacity Scheduler context + */ + void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue); + + /** + * Reinitialize policy state ( if required ) + * @param schedulerContext Capacity Scheduler context + */ + void reinitialize(CapacitySchedulerContext schedulerContext, + ParentQueue parentQueue); + + /** + * Get initial template for the specified leaf queue + * @param leafQueue the leaf queue + * @return initial leaf queue template configurations and capacities for + * auto created queue + */ + AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration( + AbstractAutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException; + + /** + * Compute/Adjust child queue capacities + * for auto created leaf queues + * + * @return returns a list of suggested QueueEntitlementChange(s) which may + * or may not be be enforced by the scheduler + */ + List computeQueueManagementChanges() + throws SchedulerDynamicEditException; + + /** + * Commit/Update state for the specified queue management changes. + */ + void commitQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 6d79b6a..5dd307c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -230,7 +230,7 @@ public interface CSQueue extends SchedulerQueue { * @param newlyParsedQueue new queue to re-initalize from * @param clusterResource resources in the cluster */ - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 51e5b17..3901398 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; -class CSQueueUtils { +public class CSQueueUtils { - final static float EPSILON = 0.0001f; + public final static float EPSILON = 0.0001f; /* * Used only by tests @@ -123,12 +123,12 @@ class CSQueueUtils { for (String label : configuredNodelabels) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { - queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL, + queueCapacities.setCapacity(label, csConf.getNonLabeledQueueCapacity(queuePath) / 100); - queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL, + queueCapacities.setMaximumCapacity(label, csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100); queueCapacities.setMaxAMResourcePercentage( - CommonNodeLabelsManager.NO_LABEL, + label, csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); } else { queueCapacities.setCapacity(label, @@ -183,9 +183,32 @@ class CSQueueUtils { if (Resources.greaterThan(rc, totalPartitionResource, totalPartitionResource, Resources.none())) { + Resource queueGuranteedResource = childQueue .getEffectiveCapacity(nodePartition); + //TODO : Modify below code to support Absolute Resource configurations + // (YARN-5881) for AutoCreatedLeafQueue + if (Float.compare(queueCapacities.getAbsoluteCapacity + (nodePartition), 0f) == 0 + && childQueue instanceof AutoCreatedLeafQueue) { + + //If absolute capacity is 0 for a leaf queue (could be a managed leaf + // queue, then use the leaf queue's template capacity to compute + // guaranteed resource for used capacity) + + // queueGuaranteed = totalPartitionedResource * + // absolute_capacity(partition) + ManagedParentQueue parentQueue = (ManagedParentQueue) + childQueue.getParent(); + QueueCapacities leafQueueTemplateCapacities = parentQueue + .getLeafQueueTemplate() + .getQueueCapacities(); + queueGuranteedResource = Resources.multiply(totalPartitionResource, + leafQueueTemplateCapacities.getAbsoluteCapacity + (nodePartition)); + } + // make queueGuranteed >= minimum_allocation to avoid divided by 0. queueGuranteedResource = Resources.max(rc, totalPartitionResource, queueGuranteedResource, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index de93a6a..a5efd9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -138,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -175,6 +179,8 @@ public class CapacityScheduler extends private CSConfigurationProvider csConfProvider; + protected Clock monotonicClock; + @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -1501,7 +1507,7 @@ public class CapacityScheduler extends { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; @@ -1613,6 +1619,25 @@ public class CapacityScheduler extends } } break; + case MANAGE_QUEUE: + { + QueueManagementChangeEvent queueManagementChangeEvent = + (QueueManagementChangeEvent) event; + ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); + try { + final List queueManagementChanges = + queueManagementChangeEvent.getQueueManagementChanges(); + ((ManagedParentQueue) parentQueue) + .validateAndApplyQueueManagementChanges(queueManagementChanges); + } catch (SchedulerDynamicEditException sde) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + parentQueue.getQueueName(), sde); + } catch (IOException ioe) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + parentQueue.getQueueName(), ioe); + } + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -1976,12 +2001,14 @@ public class CapacityScheduler extends writeLock.lock(); LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); - if (!(q instanceof AutoCreatedLeafQueue)) { + if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom( + q.getClass()))) { throw new SchedulerDynamicEditException( "The queue that we are asked " + "to remove (" + queueName - + ") is not a AutoCreatedLeafQueue"); + + ") is not a AutoCreatedLeafQueue or ReservationQueue"); } - AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q; + AbstractAutoCreatedLeafQueue disposableLeafQueue = + (AbstractAutoCreatedLeafQueue) q; // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException( @@ -1994,8 +2021,8 @@ public class CapacityScheduler extends ((AbstractManagedParentQueue) disposableLeafQueue.getParent()) .removeChildQueue(q); this.queueManager.removeQueue(queueName); - LOG.info("Removal of AutoCreatedLeafQueue " - + queueName + " has succeeded"); + LOG.info( + "Removal of AutoCreatedLeafQueue " + queueName + " has succeeded"); } finally { writeLock.unlock(); } @@ -2003,22 +2030,27 @@ public class CapacityScheduler extends @Override public void addQueue(Queue queue) - throws SchedulerDynamicEditException { + throws SchedulerDynamicEditException, IOException { try { writeLock.lock(); - if (!(queue instanceof AutoCreatedLeafQueue)) { + if (queue == null) { + throw new SchedulerDynamicEditException( + "Queue specified is null. Should be an implementation of " + + "AbstractAutoCreatedLeafQueue"); + } else if (!(AbstractAutoCreatedLeafQueue.class + .isAssignableFrom(queue.getClass()))) { throw new SchedulerDynamicEditException( - "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue"); + "Queue is not an implementation of " + + "AbstractAutoCreatedLeafQueue : " + queue.getClass()); } - AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; + AbstractAutoCreatedLeafQueue newQueue = + (AbstractAutoCreatedLeafQueue) queue; - if (newQueue.getParent() == null - || !(AbstractManagedParentQueue.class. + if (newQueue.getParent() == null || !(AbstractManagedParentQueue.class. isAssignableFrom(newQueue.getParent().getClass()))) { throw new SchedulerDynamicEditException( - "ParentQueue for " + newQueue.getQueueName() - + " is not properly set" + "ParentQueue for " + newQueue + " is not properly set" + " (should be set and be a PlanQueue or ManagedParentQueue)"); } @@ -2027,6 +2059,7 @@ public class CapacityScheduler extends String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); + LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); @@ -2039,48 +2072,32 @@ public class CapacityScheduler extends try { writeLock.lock(); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); - AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue - .getParent(); + AbstractManagedParentQueue parent = + (AbstractManagedParentQueue) queue.getParent(); - if (!(queue instanceof AutoCreatedLeafQueue)) { + if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom( + queue.getClass()))) { throw new SchedulerDynamicEditException( "Entitlement can not be" + " modified dynamically since queue " + inQueue + " is not a AutoCreatedLeafQueue"); } - if (parent == null - || !(AbstractManagedParentQueue.class.isAssignableFrom( - parent.getClass()))) { + if (parent == null || !(AbstractManagedParentQueue.class.isAssignableFrom( + parent.getClass()))) { throw new SchedulerDynamicEditException( "The parent of AutoCreatedLeafQueue " + inQueue + " must be a PlanQueue/ManagedParentQueue"); } - AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; + AbstractAutoCreatedLeafQueue newQueue = + (AbstractAutoCreatedLeafQueue) queue; + parent.validateQueueEntitlementChange(newQueue, entitlement); - float sumChilds = parent.sumOfChildCapacities(); - float newChildCap = - sumChilds - queue.getCapacity() + entitlement.getCapacity(); + newQueue.setEntitlement(entitlement); - if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { - // note: epsilon checks here are not ok, as the epsilons might - // accumulate and become a problem in aggregate - if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 - && Math.abs( - entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { - return; - } - newQueue.setEntitlement(entitlement); - } else{ - throw new SchedulerDynamicEditException( - "Sum of child queues should exceed 100% for auto creating parent " - + "queue : " + parent.getQueueName()); - } - LOG.info( - "Set entitlement for AutoCreatedLeafQueue " + inQueue - + " to " + queue.getCapacity() + - " request was (" + entitlement.getCapacity() - + ")"); + LOG.info("Set entitlement for AutoCreatedLeafQueue " + inQueue + " to " + + queue.getCapacity() + " request was (" + entitlement.getCapacity() + + ")"); } finally { writeLock.unlock(); } @@ -2718,7 +2735,6 @@ public class CapacityScheduler extends addQueue(autoCreatedLeafQueue); - //TODO - Set entitlement through capacity management policy } else{ throw new SchedulerDynamicEditException( "Could not auto-create leaf queue for " + leafQueueName http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index a33d81a..8aa41ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -923,6 +923,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); } + @VisibleForTesting + public void setNodeLocalityDelay(int nodeLocalityDelay) { + setInt(NODE_LOCALITY_DELAY, nodeLocalityDelay); + } + public int getRackLocalityAdditionalDelay() { return getInt(RACK_LOCALITY_ADDITIONAL_DELAY, DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY); @@ -1401,6 +1406,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return maxApplicationsPerQueue; } + public void setGlobalMaximumApplicationsPerQueue(int val) { + setInt(QUEUE_GLOBAL_MAX_APPLICATION, val); + } + /** * Ordering policy inside a parent queue to sort queues */ @@ -1621,8 +1630,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; @Private + private static final String AUTO_CREATE_CHILD_QUEUE_PREFIX = + "auto-create-child-queue."; + + @Private public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = - "auto-create-child-queue.enabled"; + AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled"; @Private public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = @@ -1722,8 +1735,83 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } @Private + public static final String AUTO_CREATED_QUEUE_MANAGEMENT_POLICY = + AUTO_CREATE_CHILD_QUEUE_PREFIX + "management-policy"; + + @Private + public static final String DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity" + + ".queuemanagement." + + "GuaranteedOrZeroCapacityOverTimePolicy"; + + @Private + private static final String QUEUE_MANAGEMENT_CONFIG_PREFIX = + "yarn.resourcemanager.monitor.capacity.queue-management."; + + /** + * Time in milliseconds between invocations of this policy + */ + @Private + public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL = + QUEUE_MANAGEMENT_CONFIG_PREFIX + "monitoring-interval"; + + @Private + public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = + 1500L; + + /** + * Queue Management computation policy for Auto Created queues + * @param queue The queue's path + * @return Configured policy class name + */ + @Private + public String getAutoCreatedQueueManagementPolicy(String queue) { + String autoCreatedQueueManagementPolicy = + get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_MANAGEMENT_POLICY, + DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY); + return autoCreatedQueueManagementPolicy; + } + + /** + * Get The policy class configured to manage capacities for auto created leaf + * queues under the specified parent + * + * @param queueName The parent queue's name + * @return The policy class configured to manage capacities for auto created + * leaf queues under the specified parent queue + */ + @Private + protected AutoCreatedQueueManagementPolicy + getAutoCreatedQueueManagementPolicyClass( + String queueName) { + + String queueManagementPolicyClassName = + getAutoCreatedQueueManagementPolicy(queueName); + LOG.info("Using Auto Created Queue Management Policy: " + + queueManagementPolicyClassName + " for queue: " + queueName); + try { + Class queueManagementPolicyClazz = getClassByName( + queueManagementPolicyClassName); + if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom( + queueManagementPolicyClazz)) { + return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance( + queueManagementPolicyClazz, this); + } else{ + throw new YarnRuntimeException( + "Class: " + queueManagementPolicyClassName + " not instance of " + + AutoCreatedQueueManagementPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate " + "AutoCreatedQueueManagementPolicy: " + + queueManagementPolicyClassName + " for queue: " + queueName, + e); + } + } + @VisibleForTesting - public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, + @Private + public void setAutoCreatedLeafQueueConfigCapacity(String queuePath, float val) { String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( queuePath); @@ -1732,13 +1820,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private @VisibleForTesting - public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath, float val) { String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( queuePath); setMaximumCapacity(leafQueueConfPrefix, val); } + @VisibleForTesting + @Private + public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath, + int val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setUserLimit(leafQueueConfPrefix, val); + } + + @VisibleForTesting + @Private + public void setAutoCreatedLeafQueueConfigUserLimitFactor(String queuePath, + float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setUserLimitFactor(leafQueueConfPrefix, val); + } + public static String getUnits(String resourceValue) { String units; for (int i = 0; i < resourceValue.length(); i++) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7c918a5..ae74989 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -94,4 +95,11 @@ public interface CapacitySchedulerContext { * @return if configuration is mutable */ boolean isConfigurationMutable(); + + /** + * Get clock from scheduler + * @return Clock + */ + Clock getClock(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index eb50123..30ecd40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -239,7 +239,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; List childQueues = new ArrayList<>(); - AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext, + ReservationQueue resQueue = new ReservationQueue(csContext, defReservationId, (PlanQueue) queue); try { resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f)); @@ -312,7 +312,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< Map newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry e : queues.entrySet()) { - if (!(e.getValue() instanceof AutoCreatedLeafQueue)) { + if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue() + .getClass()))) { String queueName = e.getKey(); CSQueue oldQueue = e.getValue(); CSQueue newQueue = newQueues.get(queueName); @@ -394,7 +395,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< String queueName = e.getKey(); CSQueue existingQueue = e.getValue(); - //TODO - Handle case when auto create is disabled on parent queues if (!newQueues.containsKey(queueName) && !( existingQueue instanceof AutoCreatedLeafQueue && conf .isAutoCreateChildQueueEnabled( http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 41ec4ba..86fcbc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -138,7 +138,14 @@ public class LeafQueue extends AbstractCSQueue { @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { - super(cs, queueName, parent, old); + this(cs, cs.getConfiguration(), queueName, parent, old); + } + + public LeafQueue(CapacitySchedulerContext cs, + CapacitySchedulerConfiguration configuration, + String queueName, CSQueue parent, CSQueue old) throws + IOException { + super(cs, configuration, queueName, parent, old); this.scheduler = cs; this.usersManager = new UsersManager(metrics, this, labelManager, scheduler, @@ -149,17 +156,25 @@ public class LeafQueue extends AbstractCSQueue { if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName - + ", fullname=" + getQueuePath()); + + ", fullname=" + getQueuePath()); } - setupQueueConfigs(cs.getClusterResource()); + setupQueueConfigs(cs.getClusterResource(), configuration); + } protected void setupQueueConfigs(Resource clusterResource) throws IOException { + setupQueueConfigs(clusterResource, csContext.getConfiguration()); + } + + protected void setupQueueConfigs(Resource clusterResource, + CapacitySchedulerConfiguration conf) throws + IOException { try { writeLock.lock(); - super.setupQueueConfigs(clusterResource); + CapacitySchedulerConfiguration schedConf = csContext.getConfiguration(); + super.setupQueueConfigs(clusterResource, conf); this.lastClusterResource = clusterResource; @@ -173,8 +188,6 @@ public class LeafQueue extends AbstractCSQueue { // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) setQueueResourceLimitsInfo(clusterResource); - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - setOrderingPolicy( conf.getAppOrderingPolicy(getQueuePath())); @@ -183,11 +196,13 @@ public class LeafQueue extends AbstractCSQueue { maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { - int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); + int maxGlobalPerQueueApps = schedConf + .getGlobalMaximumApplicationsPerQueue(); if (maxGlobalPerQueueApps > 0) { maxApplications = maxGlobalPerQueueApps; } else { - int maxSystemApps = conf.getMaximumSystemApplications(); + int maxSystemApps = schedConf. + getMaximumSystemApplications(); maxApplications = (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } @@ -218,9 +233,11 @@ public class LeafQueue extends AbstractCSQueue { .join(getAccessibleNodeLabels().iterator(), ','))); } - nodeLocalityDelay = conf.getNodeLocalityDelay(); - rackLocalityAdditionalDelay = conf.getRackLocalityAdditionalDelay(); - rackLocalityFullReset = conf.getRackLocalityFullReset(); + nodeLocalityDelay = schedConf.getNodeLocalityDelay(); + rackLocalityAdditionalDelay = schedConf + .getRackLocalityAdditionalDelay(); + rackLocalityFullReset = schedConf + .getRackLocalityFullReset(); // re-init this since max allocation could have changed this.minimumAllocationFactor = Resources.ratio(resourceCalculator, @@ -507,10 +524,11 @@ public class LeafQueue extends AbstractCSQueue { } } - @Override - public void reinitialize( - CSQueue newlyParsedQueue, Resource clusterResource) - throws IOException { + protected void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource, + CapacitySchedulerConfiguration configuration) throws + IOException { + try { writeLock.lock(); // Sanity check @@ -535,7 +553,7 @@ public class LeafQueue extends AbstractCSQueue { + newMax); } - setupQueueConfigs(clusterResource); + setupQueueConfigs(clusterResource, configuration); // queue metrics are updated, more resource may be available // activate the pending applications if possible @@ -547,6 +565,14 @@ public class LeafQueue extends AbstractCSQueue { } @Override + public void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + reinitialize(newlyParsedQueue, clusterResource, + csContext.getConfiguration()); + } + + @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! @@ -731,7 +757,7 @@ public class LeafQueue extends AbstractCSQueue { } } - private void activateApplications() { + protected void activateApplications() { try { writeLock.lock(); // limit of allowed resource usage for application masters @@ -1991,10 +2017,18 @@ public class LeafQueue extends AbstractCSQueue { queueCapacities.setCapacity(capacity); } + public void setCapacity(String nodeLabel, float capacity) { + queueCapacities.setCapacity(nodeLabel, capacity); + } + public void setAbsoluteCapacity(float absoluteCapacity) { queueCapacities.setAbsoluteCapacity(absoluteCapacity); } + public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) { + queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity); + } + public void setMaxApplicationsPerUser(int maxApplicationsPerUser) { this.maxApplicationsPerUser = maxApplicationsPerUser; } @@ -2002,7 +2036,12 @@ public class LeafQueue extends AbstractCSQueue { public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } - + + public void setMaxAMResourcePerQueuePercent( + float maxAMResourcePerQueuePercent) { + this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; + } + public OrderingPolicy getOrderingPolicy() { return orderingPolicy; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org