Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11232200D4F for ; Tue, 21 Nov 2017 19:19:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0F96E160BE3; Tue, 21 Nov 2017 18:19:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99FB1160C0F for ; Tue, 21 Nov 2017 19:19:23 +0100 (CET) Received: (qmail 34166 invoked by uid 500); 21 Nov 2017 18:19:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 32617 invoked by uid 99); 21 Nov 2017 18:19:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Nov 2017 18:19:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D50F8F5FA0; Tue, 21 Nov 2017 18:19:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Tue, 21 Nov 2017 18:19:27 -0000 Message-Id: <0743f1196ce6489db7eac14d96656583@git.apache.org> In-Reply-To: <32deca020aaa482fbfdaed6efdd17884@git.apache.org> References: <32deca020aaa482fbfdaed6efdd17884@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/29] hadoop git commit: YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue mapping. (Suma Shivaprasad via wangda) archived-at: Tue, 21 Nov 2017 18:19:26 -0000 YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue mapping. (Suma Shivaprasad via wangda) Change-Id: Ia1704bb8cb5070e5b180b5a85787d7b9ca57ebc6 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0987a7b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0987a7b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0987a7b8 Branch: refs/heads/HDFS-7240 Commit: 0987a7b8cbbbb2c1e4c2095821d98a7db19644df Parents: f2efaf0 Author: Wangda Tan Authored: Thu Nov 16 11:22:48 2017 -0800 Committer: Wangda Tan Committed: Thu Nov 16 11:25:52 2017 -0800 ---------------------------------------------------------------------- .../server/resourcemanager/RMAppManager.java | 7 +- .../placement/ApplicationPlacementContext.java | 52 ++ .../placement/PlacementManager.java | 34 +- .../placement/PlacementRule.java | 7 +- .../UserGroupMappingPlacementRule.java | 284 ++++++- .../server/resourcemanager/rmapp/RMAppImpl.java | 87 +- .../scheduler/capacity/AbstractCSQueue.java | 2 +- .../capacity/AbstractManagedParentQueue.java | 196 +++-- .../capacity/AutoCreatedLeafQueue.java | 27 +- .../scheduler/capacity/CapacityScheduler.java | 157 +++- .../CapacitySchedulerConfiguration.java | 153 ++++ .../capacity/CapacitySchedulerQueueManager.java | 103 ++- .../scheduler/capacity/ManagedParentQueue.java | 158 ++++ .../scheduler/capacity/ParentQueue.java | 13 - .../scheduler/capacity/PlanQueue.java | 25 +- .../scheduler/event/AppAddedSchedulerEvent.java | 37 +- .../server/resourcemanager/TestAppManager.java | 29 +- .../TestUserGroupMappingPlacementRule.java | 14 +- .../scheduler/TestSchedulerUtils.java | 1 + .../capacity/TestCapacityScheduler.java | 6 +- .../TestCapacitySchedulerAutoQueueCreation.java | 794 +++++++++++++++++++ 21 files changed, 1921 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d042590..5e82f40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -360,13 +360,8 @@ public class RMAppManager implements EventHandler, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery, long startTime) throws YarnException { + if (!isRecovery) { - // Do queue mapping - if (rmContext.getQueuePlacementManager() != null) { - // We only do queue mapping when it's a new application - rmContext.getQueuePlacementManager().placeApplication( - submissionContext, user); - } // fail the submission if configured application timeout value is invalid RMServerUtils.validateApplicationTimeouts( submissionContext.getApplicationTimeouts()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java new file mode 100644 index 0000000..f2f92b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +/** + * Each placement rule when it successfully places an application onto a queue + * returns a PlacementRuleContext which encapsulates the queue the + * application was mapped to and any parent queue for the queue (if configured) + */ +public class ApplicationPlacementContext { + + private String queue; + + private String parentQueue; + + public ApplicationPlacementContext(String queue) { + this(queue,null); + } + + public ApplicationPlacementContext(String queue, String parentQueue) { + this.queue = queue; + this.parentQueue = parentQueue; + } + + public String getQueue() { + return queue; + } + + public String getParentQueue() { + return parentQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 43a4deb..c006738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,36 +52,33 @@ public class PlacementManager { } } - public void placeApplication(ApplicationSubmissionContext asc, String user) - throws YarnException { + public ApplicationPlacementContext placeApplication( + ApplicationSubmissionContext asc, String user) throws YarnException { + try { readLock.lock(); + if (null == rules || rules.isEmpty()) { - return; + return null; } - - String newQueueName = null; + + ApplicationPlacementContext placement = null; for (PlacementRule rule : rules) { - newQueueName = rule.getQueueForApp(asc, user); - if (newQueueName != null) { + placement = rule.getPlacementForApp(asc, user); + if (placement != null) { break; } } - + // Failed to get where to place application - if (null == newQueueName && null == asc.getQueue()) { - String msg = "Failed to get where to place application=" - + asc.getApplicationId(); + if (null == placement && null == asc.getQueue()) { + String msg = "Failed to get where to place application=" + asc + .getApplicationId(); LOG.error(msg); throw new YarnException(msg); } - - // Set it to ApplicationSubmissionContext - if (!StringUtils.equals(asc.getQueue(), newQueueName)) { - LOG.info("Placed application=" + asc.getApplicationId() + " to queue=" - + newQueueName + ", original queue=" + asc.getQueue()); - asc.setQueue(newQueueName); - } + + return placement; } finally { readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java index 47dc48a..a9d5e33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; public abstract class PlacementRule { + public String getName() { return this.getClass().getName(); } @@ -50,6 +51,6 @@ public abstract class PlacementRule { * in the {@link PlacementManager} will take care *

*/ - public abstract String getQueueForApp(ApplicationSubmissionContext asc, - String user) throws YarnException; -} + public abstract ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) throws YarnException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index d617d16..9901f4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; public class UserGroupMappingPlacementRule extends PlacementRule { private static final Log LOG = LogFactory @@ -66,17 +77,41 @@ public class UserGroupMappingPlacementRule extends PlacementRule { MappingType type; String source; String queue; + String parentQueue; + + public final static String DELIMITER = ":"; public QueueMapping(MappingType type, String source, String queue) { this.type = type; this.source = source; this.queue = queue; + this.parentQueue = null; } - + + public QueueMapping(MappingType type, String source, + String queue, String parentQueue) { + this.type = type; + this.source = source; + this.queue = queue; + this.parentQueue = parentQueue; + } + public String getQueue() { return queue; } - + + public String getParentQueue() { + return parentQueue; + } + + public MappingType getType() { + return type; + } + + public String getSource() { + return source; + } + @Override public int hashCode() { return super.hashCode(); @@ -93,6 +128,13 @@ public class UserGroupMappingPlacementRule extends PlacementRule { return false; } } + + public String toString() { + return type.toString() + DELIMITER + source + DELIMITER + + (parentQueue != null ? + parentQueue + "." + queue : + queue); + } } public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, @@ -102,26 +144,27 @@ public class UserGroupMappingPlacementRule extends PlacementRule { this.groups = groups; } - private String getMappedQueue(String user) throws IOException { + private ApplicationPlacementContext getPlacementForUser(String user) + throws IOException { for (QueueMapping mapping : mappings) { if (mapping.type == MappingType.USER) { if (mapping.source.equals(CURRENT_USER_MAPPING)) { if (mapping.queue.equals(CURRENT_USER_MAPPING)) { - return user; + return getPlacementContext(mapping, user); } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - return groups.getGroups(user).get(0); + return getPlacementContext(mapping, groups.getGroups(user).get(0)); } else { - return mapping.queue; + return getPlacementContext(mapping); } } if (user.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } if (mapping.type == MappingType.GROUP) { for (String userGroups : groups.getGroups(user)) { if (userGroups.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } } @@ -130,13 +173,14 @@ public class UserGroupMappingPlacementRule extends PlacementRule { } @Override - public String getQueueForApp(ApplicationSubmissionContext asc, String user) + public ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) throws YarnException { String queueName = asc.getQueue(); ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - String mappedQueue = getMappedQueue(user); + ApplicationPlacementContext mappedQueue = getPlacementForUser(user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -153,10 +197,224 @@ public class UserGroupMappingPlacementRule extends PlacementRule { throw new YarnException(message); } } - - return queueName; + return null; } - + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping) { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, + String leafQueueName) { + if (!StringUtils.isEmpty(mapping.parentQueue)) { + return new ApplicationPlacementContext(leafQueueName, + mapping.getParentQueue()); + } else{ + return new ApplicationPlacementContext(leafQueueName); + } + } + + @VisibleForTesting + public static UserGroupMappingPlacementRule get( + CapacitySchedulerContext schedulerContext) throws IOException { + CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized queue mappings, override: " + overrideWithQueueMappings); + + List queueMappings = conf.getQueueMappings(); + + // Get new user/group mappings + List newMappings = new ArrayList<>(); + + CapacitySchedulerQueueManager queueManager = + schedulerContext.getCapacitySchedulerQueueManager(); + + // check if mappings refer to valid queues + for (QueueMapping mapping : queueMappings) { + + QueuePath queuePath = extractQueuePath(mapping.getQueue()); + if (isStaticQueueMapping(mapping)) { + //Try getting queue by its leaf queue name + // without splitting into parent/leaf queues + CSQueue queue = queueManager.getQueue(mapping.getQueue()); + if (ifQueueDoesNotExist(queue)) { + //Try getting the queue by extracting leaf and parent queue names + //Assuming its a potential auto created leaf queue + queue = queueManager.getQueue(queuePath.getLeafQueue()); + + if (ifQueueDoesNotExist(queue)) { + //if leaf queue does not exist, + // this could be a potential auto created leaf queue + //validate if parent queue is specified, + // then it should exist and + // be an instance of AutoCreateEnabledParentQueue + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping + .getQueue()); + } + newMappings.add(newMapping); + } else{ + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, + queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + // if queue exists, validate + // if its an instance of leaf queue + // if its an instance of auto created leaf queue, + // then extract parent queue name and update queue mapping + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, + queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + //If it is a dynamic queue mapping, + // we can safely assume leaf queue name does not have '.' in it + // validate + // if parent queue is specified, then + // parent queue exists and an instance of AutoCreateEnabledParentQueue + // + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping != null) { + newMappings.add(newMapping); + } else{ + newMappings.add(mapping); + } + } + } + + // initialize groups if mappings are present + if (newMappings.size() > 0) { + Groups groups = new Groups(conf); + return new UserGroupMappingPlacementRule(overrideWithQueueMappings, + newMappings, groups); + } + + return null; + } + + private static QueueMapping validateAndGetQueueMapping( + CapacitySchedulerQueueManager queueManager, CSQueue queue, + QueueMapping mapping, QueuePath queuePath) throws IOException { + if (!(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue : " + mapping.getQueue()); + } + + if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.getQueue()); + } + return newMapping; + } + return mapping; + } + + private static boolean ifQueueDoesNotExist(CSQueue queue) { + return queue == null; + } + + private static QueueMapping validateAndGetAutoCreatedQueueMapping( + CapacitySchedulerQueueManager queueManager, QueueMapping mapping, + QueuePath queuePath) throws IOException { + if (queuePath.hasParentQueue()) { + //if parent queue is specified, + // then it should exist and be an instance of ManagedParentQueue + validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()), + queuePath.getParentQueue(), queuePath.getLeafQueue()); + return new QueueMapping(mapping.getType(), mapping.getSource(), + queuePath.getLeafQueue(), queuePath.getParentQueue()); + } + + return null; + } + + private static boolean isStaticQueueMapping(QueueMapping mapping) { + return !mapping.getQueue().contains( + UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping + .getQueue().contains( + UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING); + } + + private static class QueuePath { + + public String parentQueue; + public String leafQueue; + + public QueuePath(final String leafQueue) { + this.leafQueue = leafQueue; + } + + public QueuePath(final String parentQueue, final String leafQueue) { + this.parentQueue = parentQueue; + this.leafQueue = leafQueue; + } + + public String getParentQueue() { + return parentQueue; + } + + public String getLeafQueue() { + return leafQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } + + @Override + public String toString() { + return parentQueue + DOT + leafQueue; + } + } + + private static QueuePath extractQueuePath(String queueName) + throws IOException { + int parentQueueNameEndIndex = queueName.lastIndexOf(DOT); + + if (parentQueueNameEndIndex > -1) { + final String parentQueue = queueName.substring(0, parentQueueNameEndIndex) + .trim(); + final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1) + .trim(); + return new QueuePath(parentQueue, leafQueue); + } + + return new QueuePath(queueName); + } + + private static void validateParentQueue(CSQueue parentQueue, + String parentQueueName, String leafQueueName) throws IOException { + if (parentQueue == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue [" + parentQueueName + "]"); + } else if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IOException("mapping contains leaf queue [" + leafQueueName + + "] and invalid parent queue which " + + "does not have auto creation of leaf queues enabled [" + + parentQueueName + "]"); + } else if (!parentQueue.getQueueName().equals(parentQueueName)) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue " + + "which does not match existing leaf queue's parent : [" + + parentQueueName + "] does not match [ " + parentQueue + .getQueueName() + "]"); + } + } + @VisibleForTesting public List getQueueMappings() { return mappings; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ae5f6b4..85d355f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -158,6 +161,8 @@ public class RMAppImpl implements RMApp, Recoverable { private boolean isNumAttemptsBeyondThreshold = false; + + // Mutable fields private long startTime; private long finishTime = 0; @@ -1073,38 +1078,51 @@ public class RMAppImpl implements RMApp, Recoverable { app.getUser(), BuilderUtils.parseTokensConf(app.submissionContext)); } catch (Exception e) { - String msg = "Failed to fetch user credentials from application:" - + e.getMessage(); + String msg = "Failed to fetch user credentials from application:" + e + .getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); } } - for (Map.Entry timeout : - app.applicationTimeouts.entrySet()) { + for (Map.Entry timeout : app.applicationTimeouts + .entrySet()) { app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) { long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId + " is registered for timeout monitor, type=" + timeout.getKey() - + " remaining timeout=" - + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); + + " remaining timeout=" + (remainingTime > 0 ? + remainingTime / 1000 : + 0) + " seconds"); } } + ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(app.rmContext, + app.submissionContext, app.user); + } catch (Exception e) { + String msg = "Failed to place application to queue :" + e.getMessage(); + app.diagnostics.append(msg); + LOG.error(msg, e); + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false, app.applicationPriority)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false, + app.applicationPriority, placementContext)); return RMAppState.SUBMITTED; } // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, true, app.applicationPriority)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, true, + app.applicationPriority, placementContext)); // recover attempts app.recoverAppAttempts(); @@ -1120,8 +1138,20 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false, app.applicationPriority)); + ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(app.rmContext, + app.submissionContext, app.user); + replaceQueueFromPlacementContext(placementContext, + app.submissionContext); + } catch (YarnException e) { + String msg = "Failed to place application to queue :" + e.getMessage(); + app.diagnostics.append(msg); + LOG.error(msg, e); + } + app.handler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false, + app.applicationPriority, placementContext)); // send the ATS create Event app.sendATSCreateEvent(); } @@ -2013,4 +2043,37 @@ public class RMAppImpl implements RMApp, Recoverable { this.submissionContext.setAMContainerSpec(null); this.submissionContext.setLogAggregationContext(null); } + + @VisibleForTesting + static ApplicationPlacementContext placeApplication(RMContext rmContext, + ApplicationSubmissionContext context, String user) throws YarnException { + + ApplicationPlacementContext placementContext = null; + PlacementManager placementManager = rmContext.getQueuePlacementManager(); + + if (placementManager != null) { + placementContext = placementManager.placeApplication(context, user); + } else{ + LOG.error( + "Queue Placement Manager is null. Cannot place application :" + " " + + context.getApplicationId() + " to queue "); + } + + return placementContext; + } + + static void replaceQueueFromPlacementContext( + ApplicationPlacementContext placementContext, + ApplicationSubmissionContext context) { + // Set it to ApplicationSubmissionContext + //apply queue mapping only to new application submissions + if (placementContext != null && !StringUtils.equals(context.getQueue(), + placementContext.getQueue())) { + LOG.info("Placed application=" + context.getApplicationId() + " to queue=" + + placementContext.getQueue() + ", original queue=" + context + .getQueue()); + context.setQueue(placementContext.getQueue()); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 183cb36..74c85ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -78,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue { final String queueName; private final String queuePath; volatile int numContainers; - + final Resource minimumAllocation; volatile Resource maximumAllocation; private volatile QueueState state = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index b3d1b47..46f5cf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -35,31 +35,13 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { private static final Logger LOG = LoggerFactory.getLogger( AbstractManagedParentQueue.class); - private int maxAppsForAutoCreatedQueues; - private int maxAppsPerUserForAutoCreatedQueues; - private int userLimit; - private float userLimitFactor; + protected AutoCreatedLeafQueueTemplate leafQueueTemplate; public AbstractManagedParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); super.setupQueueConfigs(csContext.getClusterResource()); - initializeLeafQueueConfigs(); - - StringBuffer queueInfo = new StringBuffer(); - queueInfo.append("Created Managed Parent Queue: ").append(queueName) - .append("\nof type : [" + getClass()) - .append("]\nwith capacity: [") - .append(super.getCapacity()).append("]\nwith max capacity: [") - .append(super.getMaximumCapacity()).append("\nwith max apps: [") - .append(getMaxApplicationsForAutoCreatedQueues()) - .append("]\nwith max apps per user: [") - .append(getMaxApplicationsPerUserForAutoCreatedQueues()) - .append("]\nwith user limit: [").append(getUserLimit()) - .append("]\nwith user limit factor: [") - .append(getUserLimitFactor()).append("]."); - LOG.info(queueInfo.toString()); } @Override @@ -71,8 +53,6 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { // Set new configs setupQueueConfigs(clusterResource); - initializeLeafQueueConfigs(); - // run reinitialize on each existing queue, to trigger absolute cap // recomputations for (CSQueue res : this.getChildQueues()) { @@ -87,72 +67,29 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { * Initialize leaf queue configs from template configurations specified on * parent queue. */ - protected void initializeLeafQueueConfigs() { + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + (String queuePath) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - final String queuePath = super.getQueuePath(); + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new + AutoCreatedLeafQueueTemplate.Builder(); int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); if (maxApps < 0) { maxApps = (int) ( CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * getAbsoluteCapacity()); } - userLimit = conf.getUserLimit(queuePath); - userLimitFactor = conf.getUserLimitFactor(queuePath); - maxAppsForAutoCreatedQueues = maxApps; - maxAppsPerUserForAutoCreatedQueues = - (int) (maxApps * (userLimit / 100.0f) * userLimitFactor); - - } - - /** - * Number of maximum applications for each of the auto created leaf queues. - * - * @return maxAppsForAutoCreatedQueues - */ - public int getMaxApplicationsForAutoCreatedQueues() { - return maxAppsForAutoCreatedQueues; - } - - /** - * Number of maximum applications per user for each of the auto created - * leaf queues. - * - * @return maxAppsPerUserForAutoCreatedQueues - */ - public int getMaxApplicationsPerUserForAutoCreatedQueues() { - return maxAppsPerUserForAutoCreatedQueues; - } - - /** - * User limit value for each of the auto created leaf queues. - * - * @return userLimit - */ - public int getUserLimitForAutoCreatedQueues() { - return userLimit; - } - - /** - * User limit factor value for each of the auto created leaf queues. - * - * @return userLimitFactor - */ - public float getUserLimitFactor() { - return userLimitFactor; - } - public int getMaxAppsForAutoCreatedQueues() { - return maxAppsForAutoCreatedQueues; - } + int userLimit = conf.getUserLimit(queuePath); + float userLimitFactor = conf.getUserLimitFactor(queuePath); + leafQueueTemplateBuilder.userLimit(userLimit) + .userLimitFactor(userLimitFactor) + .maxApps(maxApps) + .maxAppsPerUser( + (int) (maxApps * (userLimit / 100.0f) * userLimitFactor)); - public int getMaxAppsPerUserForAutoCreatedQueues() { - return maxAppsPerUserForAutoCreatedQueues; - } - - public int getUserLimit() { - return userLimit; + return leafQueueTemplateBuilder; } /** @@ -229,4 +166,111 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { } return childQueue; } + + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } + + protected float sumOfChildAbsCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getAbsoluteCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } + + public static class AutoCreatedLeafQueueTemplate { + + private QueueCapacities queueCapacities; + + private int maxApps; + private int maxAppsPerUser; + private int userLimit; + private float userLimitFactor; + + AutoCreatedLeafQueueTemplate(Builder builder) { + this.maxApps = builder.maxApps; + this.maxAppsPerUser = builder.maxAppsPerUser; + this.userLimit = builder.userLimit; + this.userLimitFactor = builder.userLimitFactor; + this.queueCapacities = builder.queueCapacities; + } + + public static class Builder { + private int maxApps; + private int maxAppsPerUser; + + private int userLimit; + private float userLimitFactor; + + private QueueCapacities queueCapacities; + + Builder maxApps(int maxApplications) { + this.maxApps = maxApplications; + return this; + } + + Builder maxAppsPerUser(int maxApplicationsPerUser) { + this.maxAppsPerUser = maxApplicationsPerUser; + return this; + } + + Builder userLimit(int usrLimit) { + this.userLimit = usrLimit; + return this; + } + + Builder userLimitFactor(float ulf) { + this.userLimitFactor = ulf; + return this; + } + + Builder capacities(QueueCapacities capacities) { + this.queueCapacities = capacities; + return this; + } + + AutoCreatedLeafQueueTemplate build() { + return new AutoCreatedLeafQueueTemplate(this); + } + } + + public int getUserLimit() { + return userLimit; + } + + public float getUserLimitFactor() { + return userLimitFactor; + } + + public QueueCapacities getQueueCapacities() { + return queueCapacities; + } + + public int getMaxApps() { + return maxApps; + } + + public int getMaxAppsPerUser() { + return maxAppsPerUser; + } + } + + public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() { + return leafQueueTemplate; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index 4eb7cdd..bc206d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,17 +44,18 @@ public class AutoCreatedLeafQueue extends LeafQueue { AbstractManagedParentQueue parent) throws IOException { super(cs, queueName, parent, null); - updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForAutoCreatedQueues(), - parent.getMaxApplicationsPerUserForAutoCreatedQueues()); - + AutoCreatedLeafQueueTemplate leafQueueTemplate = + parent.getLeafQueueTemplate(); + updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), + leafQueueTemplate.getUserLimitFactor(), + leafQueueTemplate.getMaxApps(), + leafQueueTemplate.getMaxAppsPerUser()); this.parent = parent; } @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { try { writeLock.lock(); @@ -62,10 +65,12 @@ public class AutoCreatedLeafQueue extends LeafQueue { CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, null); - updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForAutoCreatedQueues(), - parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + AutoCreatedLeafQueueTemplate leafQueueTemplate = + parent.getLeafQueueTemplate(); + updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), + leafQueueTemplate.getUserLimitFactor(), + leafQueueTemplate.getMaxApps(), + leafQueueTemplate.getMaxAppsPerUser()); } finally { writeLock.unlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6f630f8..ed30ad1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -64,10 +63,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; @@ -146,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -560,44 +561,17 @@ public class CapacityScheduler extends } @VisibleForTesting - public UserGroupMappingPlacementRule - getUserGroupMappingPlacementRule() throws IOException { + public PlacementRule getUserGroupMappingPlacementRule() throws IOException { try { readLock.lock(); - boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); - LOG.info( - "Initialized queue mappings, override: " + overrideWithQueueMappings); - - // Get new user/group mappings - List newMappings = conf.getQueueMappings(); - // check if mappings refer to valid queues - for (QueueMapping mapping : newMappings) { - String mappingQueue = mapping.getQueue(); - if (!mappingQueue.equals( - UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue - .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = getQueue(mappingQueue); - if (queue == null || !(queue instanceof LeafQueue)) { - throw new IOException( - "mapping contains invalid or non-leaf queue " + mappingQueue); - } - } - } - - // initialize groups if mappings are present - if (newMappings.size() > 0) { - Groups groups = new Groups(conf); - return new UserGroupMappingPlacementRule(overrideWithQueueMappings, - newMappings, groups); - } - - return null; + return UserGroupMappingPlacementRule.get(this); } finally { readLock.unlock(); } } - private void updatePlacementRules() throws IOException { + @VisibleForTesting + void updatePlacementRules() throws IOException { // Initialize placement rules Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); @@ -731,37 +705,92 @@ public class CapacityScheduler extends } } - private void addApplication(ApplicationId applicationId, - String queueName, String user, Priority priority) { + private void addApplication(ApplicationId applicationId, String queueName, + String user, Priority priority, + ApplicationPlacementContext placementContext) { try { writeLock.lock(); if (isSystemAppsLimitReached()) { String message = "Maximum system application limit reached," + "cannot accept submission of application: " + applicationId; - this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( - applicationId, RMAppEventType.APP_REJECTED, message)); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); return; } // Sanity checks. CSQueue queue = getQueue(queueName); + + if (queue == null && placementContext != null) { + //Could be a potential auto-created leaf queue + try { + queue = autoCreateLeafQueue(placementContext); + } catch (YarnException | IOException e) { + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user : " + user + + " to queue : " + queueName + " failed : " + e.getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } + } + if (queue == null) { - String message = + final String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); return; } + if (!(queue instanceof LeafQueue)) { String message = - "Application " + applicationId + " submitted by user " + user - + " to non-leaf queue: " + queueName; + "Application " + applicationId + " submitted by user : " + user + + " to non-leaf queue : " + queueName; this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); return; + } else if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + //If queue already exists and auto-queue creation was not required, + //placement context should not be null + if (placementContext == null) { + String message = + "Application " + applicationId + " submission by user : " + user + + " to specified queue : " + queueName + " is prohibited. " + + "Verify automatic queue mapping for user exists in " + + QUEUE_MAPPING; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + // For a queue which exists already and + // not auto-created above, then its parent queue should match + // the parent queue specified in queue mapping + } else if (!queue.getParent().getQueueName().equals( + placementContext.getParentQueue())) { + String message = + "Auto created Leaf queue " + placementContext.getQueue() + " " + + "already exists under queue : " + queue + .getParent().getQueuePath() + + ".But Queue mapping configuration " + + CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been " + + "updated to a different parent queue : " + + placementContext.getParentQueue() + + " for the specified user : " + user; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } } + // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); @@ -1483,7 +1512,8 @@ public class CapacityScheduler extends if (queueName != null) { if (!appAddedEvent.getIsAppRecovering()) { addApplication(appAddedEvent.getApplicationId(), queueName, - appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); + appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(), + appAddedEvent.getPlacementContext()); } else { addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); @@ -2001,7 +2031,8 @@ public class CapacityScheduler extends try { writeLock.lock(); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); - ParentQueue parent = (ParentQueue) queue.getParent(); + AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue + .getParent(); if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( @@ -2010,7 +2041,8 @@ public class CapacityScheduler extends } if (parent == null - || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) { + || !(AbstractManagedParentQueue.class.isAssignableFrom( + parent.getClass()))) { throw new SchedulerDynamicEditException( "The parent of AutoCreatedLeafQueue " + inQueue + " must be a PlanQueue/ManagedParentQueue"); @@ -2655,4 +2687,43 @@ public class CapacityScheduler extends } return null; } + + private LeafQueue autoCreateLeafQueue( + ApplicationPlacementContext placementContext) + throws IOException, YarnException { + + AutoCreatedLeafQueue autoCreatedLeafQueue = null; + + String leafQueueName = placementContext.getQueue(); + String parentQueueName = placementContext.getParentQueue(); + + if (!StringUtils.isEmpty(parentQueueName)) { + CSQueue parentQueue = getQueue(parentQueueName); + + if (parentQueue != null && conf.isAutoCreateChildQueueEnabled( + parentQueue.getQueuePath())) { + + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, + autoCreateEnabledParentQueue); + + addQueue(autoCreatedLeafQueue); + + //TODO - Set entitlement through capacity management policy + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping specifies an invalid parent queue " + + "which does not exist " + + parentQueueName); + } + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping does not specify" + + " which parent queue it needs to be created under."); + } + return autoCreatedLeafQueue; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfead35..4515453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -907,6 +907,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + @Private + @VisibleForTesting + public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings); + } + /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value @@ -981,6 +987,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return mappings; } + @Private + @VisibleForTesting + public void setQueuePlacementRules(Collection queuePlacementRules) { + if (queuePlacementRules == null) { + return; + } + String str = StringUtils.join(",", queuePlacementRules); + setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str); + } + + @Private + @VisibleForTesting + public void setQueueMappings(List queueMappings) { + if (queueMappings == null) { + return; + } + + List queueMappingStrs = new ArrayList<>(); + for (QueueMapping mapping : queueMappings) { + queueMappingStrs.add(mapping.toString()); + } + + setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); + } + public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); @@ -1523,4 +1554,126 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); } + + @Private + public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = + "auto-create-child-queue.enabled"; + + @Private + public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = + "leaf-queue-template"; + + @Private + public static final String AUTO_CREATE_QUEUE_MAX_QUEUES = + "auto-create-child-queue.max-queues"; + + @Private + public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000; + + /** + * If true, this queue will be created as a Parent Queue which Auto Created + * leaf child queues + * + * @param queuePath The queues path + * @return true if auto create is enabled for child queues else false. Default + * is false + */ + @Private + public boolean isAutoCreateChildQueueEnabled(String queuePath) { + boolean isAutoCreateEnabled = getBoolean( + getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED); + return isAutoCreateEnabled; + } + + @Private + @VisibleForTesting + public void setAutoCreateChildQueueEnabled(String queuePath, + boolean autoCreationEnabled) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_ENABLED, + autoCreationEnabled); + } + + /** + * Get the auto created leaf queue's template configuration prefix + * Leaf queue's template capacities are configured at the parent queue + * + * @param queuePath parent queue's path + * @return Config prefix for leaf queue template configurations + */ + @Private + public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) { + return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX; + } + + @Private + public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = + "auto-create-child-queue.fail-on-exceeding-parent-capacity"; + + @Private + public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = + false; + + /** + * Fail further auto leaf queue creation when parent's guaranteed capacity is + * exceeded. + * + * @param queuePath the parent queue's path + * @return true if configured to fail else false + */ + @Private + public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + String queuePath) { + boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity = + getBoolean(getQueuePrefix(queuePath) + + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, + DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY); + return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity; + } + + @VisibleForTesting + @Private + public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + String queuePath, boolean autoCreationEnabled) { + setBoolean( + getQueuePrefix(queuePath) + + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, + autoCreationEnabled); + } + + /** + * Get the max number of leaf queues that are allowed to be created under + * a parent queue + * + * @param queuePath the paret queue's path + * @return the max number of leaf queues allowed to be auto created + */ + @Private + public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { + return getInt(getQueuePrefix(queuePath) + + AUTO_CREATE_QUEUE_MAX_QUEUES, + DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES); + } + + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, + float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setCapacity(leafQueueConfPrefix, val); + } + + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setMaximumCapacity(leafQueueConfPrefix, val); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7be2529..eb50123 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -154,7 +154,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * @throws IOException if fails to initialize queues */ public void initializeQueues(CapacitySchedulerConfiguration conf) - throws IOException { + throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, appPriorityACLManager, queues); @@ -176,7 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< if (!csContext.isConfigurationMutable() || csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { - // Ensure queue hiearchy in the new XML file is proper. + // Ensure queue hierarchy in the new XML file is proper. validateQueueHierarchy(queues, newQueues); } @@ -216,11 +216,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; - String fullQueueName = - (parent == null) ? queueName - : (parent.getQueuePath() + "." + queueName); + String fullQueueName = (parent == null) ? + queueName : + (parent.getQueuePath() + "." + queueName); String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); + boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled( + fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { if (null == parent) { throw new IllegalStateException( @@ -229,9 +231,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< // Check if the queue will be dynamically managed by the Reservation // system if (isReservableQueue) { - queue = - new PlanQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + queue = new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); //initializing the "internal" default queue, for SLS compatibility String defReservationId = @@ -249,38 +250,46 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< ((PlanQueue) queue).setChildQueues(childQueues); queues.put(defReservationId, resQueue); - } else { - queue = - new LeafQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + } else if (isAutoCreateEnabled) { + queue = new ManagedParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else{ + queue = new LeafQueue(csContext, queueName, parent, + oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(queue); } - } else { + } else{ if (isReservableQueue) { throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + + ParentQueue parentQueue; + if (isAutoCreateEnabled) { + parentQueue = new ManagedParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else{ + parentQueue = new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); List childQueues = new ArrayList<>(); for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); + CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName, + queues, oldQueues, hook); childQueues.add(childQueue); } parentQueue.setChildQueues(childQueues); + } - if (queue instanceof LeafQueue && queues.containsKey(queueName) - && queues.get(queueName) instanceof LeafQueue) { + if (queue instanceof LeafQueue && queues.containsKey(queueName) && queues + .get(queueName) instanceof LeafQueue) { throw new IOException("Two leaf queues were named " + queueName + ". Leaf queue names must be distinct"); } @@ -312,27 +321,46 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< if (oldQueue.getState() == QueueState.STOPPED) { LOG.info("Deleting Queue " + queueName + ", as it is not" + " present in the modified capacity configuration xml"); - } else { + } else{ throw new IOException(oldQueue.getQueuePath() + " is deleted from" + " the new capacity scheduler configuration, but the" - + " queue is not yet in stopped state. " - + "Current State : " + oldQueue.getState()); + + " queue is not yet in stopped state. " + "Current State : " + + oldQueue.getState()); } } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { //Queue's cannot be moved from one hierarchy to other - throw new IOException(queueName + " is moved from:" - + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() - + " after refresh, which is not allowed."); - } else if (oldQueue instanceof LeafQueue + throw new IOException( + queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" + + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } else if (oldQueue instanceof ParentQueue + && !(oldQueue instanceof ManagedParentQueue) + && newQueue instanceof ManagedParentQueue) { + throw new IOException( + "Can not convert parent queue: " + oldQueue.getQueuePath() + + " to auto create enabled parent queue since " + + "it could have other pre-configured queues which is not " + + "supported"); + } else if (oldQueue instanceof ManagedParentQueue + && !(newQueue instanceof ManagedParentQueue)) { + throw new IOException( + "Cannot convert auto create enabled parent queue: " + oldQueue + .getQueuePath() + " to leaf queue. Please check " + + " parent queue's configuration " + + CapacitySchedulerConfiguration + .AUTO_CREATE_CHILD_QUEUE_ENABLED + + " is set to true"); + } else if (oldQueue instanceof LeafQueue && newQueue instanceof ParentQueue) { if (oldQueue.getState() == QueueState.STOPPED) { LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() + " to parent queue."); - } else { - throw new IOException("Can not convert the leaf queue: " - + oldQueue.getQueuePath() + " to parent queue since " - + "it is not yet in stopped state. Current State : " - + oldQueue.getState()); + } else{ + throw new IOException( + "Can not convert the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue since " + + "it is not yet in stopped state. Current State : " + + oldQueue.getState()); } } else if (oldQueue instanceof ParentQueue && newQueue instanceof LeafQueue) { @@ -352,6 +380,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< */ private void updateQueues(Map existingQueues, Map newQueues) { + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); for (Map.Entry e : newQueues.entrySet()) { String queueName = e.getKey(); CSQueue queue = e.getValue(); @@ -363,7 +392,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< .iterator(); itr.hasNext();) { Map.Entry e = itr.next(); String queueName = e.getKey(); - if (!newQueues.containsKey(queueName)) { + CSQueue existingQueue = e.getValue(); + + //TODO - Handle case when auto create is disabled on parent queues + if (!newQueues.containsKey(queueName) && !( + existingQueue instanceof AutoCreatedLeafQueue && conf + .isAutoCreateChildQueueEnabled( + existingQueue.getParent().getQueuePath()))) { itr.remove(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java new file mode 100644 index 0000000..ff795e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Auto Creation enabled Parent queue. This queue initially does not have any + * children to start with and all child + * leaf queues will be auto created. Currently this does not allow other + * pre-configured leaf or parent queues to + * co-exist along with auto-created leaf queues. The auto creation is limited + * to leaf queues currently. + */ +public class ManagedParentQueue extends AbstractManagedParentQueue { + + private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false; + + private static final Logger LOG = LoggerFactory.getLogger( + ManagedParentQueue.class); + + public ManagedParentQueue(final CapacitySchedulerContext cs, + final String queueName, final CSQueue parent, final CSQueue old) + throws IOException { + super(cs, queueName, parent, old); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Managed Parent Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + } + + @Override + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs( + String queuePath) { + + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = + super.initializeLeafQueueConfigs(queuePath); + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf); + QueueCapacities queueCapacities = new QueueCapacities(false); + CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix, + csContext.getConfiguration(), queueCapacities, getQueueCapacities()); + leafQueueTemplate.capacities(queueCapacities); + + shouldFailAutoCreationWhenGuaranteedCapacityExceeded = + conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath()); + + return leafQueueTemplate; + } + + protected void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + } + + @Override + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + + if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected child queue to be an instance of AutoCreatedLeafQueue"); + } + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) childQueue.getParent(); + + String leafQueueName = childQueue.getQueueName(); + int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit( + parentQueue.getQueuePath()); + + if (parentQueue.getChildQueues().size() >= maxQueues) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ".Max Child " + + "Queue limit exceeded which is configured as : " + maxQueues + + " and number of child queues is : " + parentQueue + .getChildQueues().size()); + } + + if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) { + if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity() + + parentQueue.sumOfChildAbsCapacities() > parentQueue + .getAbsoluteCapacity()) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ". Child " + + "queues capacities have reached parent queue : " + + parentQueue.getQueuePath() + " guaranteed capacity"); + } + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + super.addChildQueue(leafQueue); + //TODO - refresh policy queue after capacity management is added + + } finally { + writeLock.unlock(); + } + } + + private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { + return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d61951b..959ca51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1081,17 +1081,4 @@ public class ParentQueue extends AbstractCSQueue { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } - - protected float sumOfChildCapacities() { - try { - writeLock.lock(); - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); - } - return ret; - } finally { - writeLock.unlock(); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 4ab2e9f..b7f8aa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue { public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); } @Override @@ -47,17 +60,21 @@ public class PlanQueue extends AbstractManagedParentQueue { throws IOException { validate(newlyParsedQueue); super.reinitialize(newlyParsedQueue, clusterResource); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); } @Override - protected void initializeLeafQueueConfigs() { - String queuePath = super.getQueuePath(); + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + (String queuePath) { + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super + .initializeLeafQueueConfigs + (queuePath); showReservationsAsQueues = csContext.getConfiguration() .getShowReservationAsQueues(queuePath); - super.initializeLeafQueueConfigs(); + return leafQueueTemplate; } - private void validate(final CSQueue newlyParsedQueue) throws IOException { + protected void validate(final CSQueue newlyParsedQueue) throws IOException { // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org