Return-Path:
X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io
Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by cust-asf2.ponee.io (Postfix) with ESMTP id 11232200D4F
for ; Tue, 21 Nov 2017 19:19:26 +0100 (CET)
Received: by cust-asf.ponee.io (Postfix)
id 0F96E160BE3; Tue, 21 Nov 2017 18:19:26 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id 99FB1160C0F
for ; Tue, 21 Nov 2017 19:19:23 +0100 (CET)
Received: (qmail 34166 invoked by uid 500); 21 Nov 2017 18:19:18 -0000
Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Delivered-To: mailing list common-commits@hadoop.apache.org
Received: (qmail 32617 invoked by uid 99); 21 Nov 2017 18:19:17 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Nov 2017 18:19:17 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33)
id D50F8F5FA0; Tue, 21 Nov 2017 18:19:16 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: aengineer@apache.org
To: common-commits@hadoop.apache.org
Date: Tue, 21 Nov 2017 18:19:27 -0000
Message-Id: <0743f1196ce6489db7eac14d96656583@git.apache.org>
In-Reply-To: <32deca020aaa482fbfdaed6efdd17884@git.apache.org>
References: <32deca020aaa482fbfdaed6efdd17884@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [12/29] hadoop git commit: YARN-7419. CapacityScheduler: Allow auto
leaf queue creation after queue mapping. (Suma Shivaprasad via wangda)
archived-at: Tue, 21 Nov 2017 18:19:26 -0000
YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue mapping. (Suma Shivaprasad via wangda)
Change-Id: Ia1704bb8cb5070e5b180b5a85787d7b9ca57ebc6
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0987a7b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0987a7b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0987a7b8
Branch: refs/heads/HDFS-7240
Commit: 0987a7b8cbbbb2c1e4c2095821d98a7db19644df
Parents: f2efaf0
Author: Wangda Tan
Authored: Thu Nov 16 11:22:48 2017 -0800
Committer: Wangda Tan
Committed: Thu Nov 16 11:25:52 2017 -0800
----------------------------------------------------------------------
.../server/resourcemanager/RMAppManager.java | 7 +-
.../placement/ApplicationPlacementContext.java | 52 ++
.../placement/PlacementManager.java | 34 +-
.../placement/PlacementRule.java | 7 +-
.../UserGroupMappingPlacementRule.java | 284 ++++++-
.../server/resourcemanager/rmapp/RMAppImpl.java | 87 +-
.../scheduler/capacity/AbstractCSQueue.java | 2 +-
.../capacity/AbstractManagedParentQueue.java | 196 +++--
.../capacity/AutoCreatedLeafQueue.java | 27 +-
.../scheduler/capacity/CapacityScheduler.java | 157 +++-
.../CapacitySchedulerConfiguration.java | 153 ++++
.../capacity/CapacitySchedulerQueueManager.java | 103 ++-
.../scheduler/capacity/ManagedParentQueue.java | 158 ++++
.../scheduler/capacity/ParentQueue.java | 13 -
.../scheduler/capacity/PlanQueue.java | 25 +-
.../scheduler/event/AppAddedSchedulerEvent.java | 37 +-
.../server/resourcemanager/TestAppManager.java | 29 +-
.../TestUserGroupMappingPlacementRule.java | 14 +-
.../scheduler/TestSchedulerUtils.java | 1 +
.../capacity/TestCapacityScheduler.java | 6 +-
.../TestCapacitySchedulerAutoQueueCreation.java | 794 +++++++++++++++++++
21 files changed, 1921 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index d042590..5e82f40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -360,13 +360,8 @@ public class RMAppManager implements EventHandler,
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException {
+
if (!isRecovery) {
- // Do queue mapping
- if (rmContext.getQueuePlacementManager() != null) {
- // We only do queue mapping when it's a new application
- rmContext.getQueuePlacementManager().placeApplication(
- submissionContext, user);
- }
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
new file mode 100644
index 0000000..f2f92b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+/**
+ * Each placement rule when it successfully places an application onto a queue
+ * returns a PlacementRuleContext which encapsulates the queue the
+ * application was mapped to and any parent queue for the queue (if configured)
+ */
+public class ApplicationPlacementContext {
+
+ private String queue;
+
+ private String parentQueue;
+
+ public ApplicationPlacementContext(String queue) {
+ this(queue,null);
+ }
+
+ public ApplicationPlacementContext(String queue, String parentQueue) {
+ this.queue = queue;
+ this.parentQueue = parentQueue;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public String getParentQueue() {
+ return parentQueue;
+ }
+
+ public boolean hasParentQueue() {
+ return parentQueue != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index 43a4deb..c006738 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,36 +52,33 @@ public class PlacementManager {
}
}
- public void placeApplication(ApplicationSubmissionContext asc, String user)
- throws YarnException {
+ public ApplicationPlacementContext placeApplication(
+ ApplicationSubmissionContext asc, String user) throws YarnException {
+
try {
readLock.lock();
+
if (null == rules || rules.isEmpty()) {
- return;
+ return null;
}
-
- String newQueueName = null;
+
+ ApplicationPlacementContext placement = null;
for (PlacementRule rule : rules) {
- newQueueName = rule.getQueueForApp(asc, user);
- if (newQueueName != null) {
+ placement = rule.getPlacementForApp(asc, user);
+ if (placement != null) {
break;
}
}
-
+
// Failed to get where to place application
- if (null == newQueueName && null == asc.getQueue()) {
- String msg = "Failed to get where to place application="
- + asc.getApplicationId();
+ if (null == placement && null == asc.getQueue()) {
+ String msg = "Failed to get where to place application=" + asc
+ .getApplicationId();
LOG.error(msg);
throw new YarnException(msg);
}
-
- // Set it to ApplicationSubmissionContext
- if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
- LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
- + newQueueName + ", original queue=" + asc.getQueue());
- asc.setQueue(newQueueName);
- }
+
+ return placement;
} finally {
readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
index 47dc48a..a9d5e33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
public abstract class PlacementRule {
+
public String getName() {
return this.getClass().getName();
}
@@ -50,6 +51,6 @@ public abstract class PlacementRule {
* in the {@link PlacementManager} will take care
*
*/
- public abstract String getQueueForApp(ApplicationSubmissionContext asc,
- String user) throws YarnException;
-}
+ public abstract ApplicationPlacementContext getPlacementForApp(
+ ApplicationSubmissionContext asc, String user) throws YarnException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
index d617d16..9901f4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
public class UserGroupMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory
@@ -66,17 +77,41 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
MappingType type;
String source;
String queue;
+ String parentQueue;
+
+ public final static String DELIMITER = ":";
public QueueMapping(MappingType type, String source, String queue) {
this.type = type;
this.source = source;
this.queue = queue;
+ this.parentQueue = null;
}
-
+
+ public QueueMapping(MappingType type, String source,
+ String queue, String parentQueue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ this.parentQueue = parentQueue;
+ }
+
public String getQueue() {
return queue;
}
-
+
+ public String getParentQueue() {
+ return parentQueue;
+ }
+
+ public MappingType getType() {
+ return type;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
@Override
public int hashCode() {
return super.hashCode();
@@ -93,6 +128,13 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
return false;
}
}
+
+ public String toString() {
+ return type.toString() + DELIMITER + source + DELIMITER +
+ (parentQueue != null ?
+ parentQueue + "." + queue :
+ queue);
+ }
}
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
@@ -102,26 +144,27 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
this.groups = groups;
}
- private String getMappedQueue(String user) throws IOException {
+ private ApplicationPlacementContext getPlacementForUser(String user)
+ throws IOException {
for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
- return user;
+ return getPlacementContext(mapping, user);
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
- return groups.getGroups(user).get(0);
+ return getPlacementContext(mapping, groups.getGroups(user).get(0));
} else {
- return mapping.queue;
+ return getPlacementContext(mapping);
}
}
if (user.equals(mapping.source)) {
- return mapping.queue;
+ return getPlacementContext(mapping);
}
}
if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) {
- return mapping.queue;
+ return getPlacementContext(mapping);
}
}
}
@@ -130,13 +173,14 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
}
@Override
- public String getQueueForApp(ApplicationSubmissionContext asc, String user)
+ public ApplicationPlacementContext getPlacementForApp(
+ ApplicationSubmissionContext asc, String user)
throws YarnException {
String queueName = asc.getQueue();
ApplicationId applicationId = asc.getApplicationId();
if (mappings != null && mappings.size() > 0) {
try {
- String mappedQueue = getMappedQueue(user);
+ ApplicationPlacementContext mappedQueue = getPlacementForUser(user);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
@@ -153,10 +197,224 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
throw new YarnException(message);
}
}
-
- return queueName;
+ return null;
}
-
+
+ private ApplicationPlacementContext getPlacementContext(
+ QueueMapping mapping) {
+ return getPlacementContext(mapping, mapping.getQueue());
+ }
+
+ private ApplicationPlacementContext getPlacementContext(QueueMapping mapping,
+ String leafQueueName) {
+ if (!StringUtils.isEmpty(mapping.parentQueue)) {
+ return new ApplicationPlacementContext(leafQueueName,
+ mapping.getParentQueue());
+ } else{
+ return new ApplicationPlacementContext(leafQueueName);
+ }
+ }
+
+ @VisibleForTesting
+ public static UserGroupMappingPlacementRule get(
+ CapacitySchedulerContext schedulerContext) throws IOException {
+ CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
+ boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info(
+ "Initialized queue mappings, override: " + overrideWithQueueMappings);
+
+ List queueMappings = conf.getQueueMappings();
+
+ // Get new user/group mappings
+ List newMappings = new ArrayList<>();
+
+ CapacitySchedulerQueueManager queueManager =
+ schedulerContext.getCapacitySchedulerQueueManager();
+
+ // check if mappings refer to valid queues
+ for (QueueMapping mapping : queueMappings) {
+
+ QueuePath queuePath = extractQueuePath(mapping.getQueue());
+ if (isStaticQueueMapping(mapping)) {
+ //Try getting queue by its leaf queue name
+ // without splitting into parent/leaf queues
+ CSQueue queue = queueManager.getQueue(mapping.getQueue());
+ if (ifQueueDoesNotExist(queue)) {
+ //Try getting the queue by extracting leaf and parent queue names
+ //Assuming its a potential auto created leaf queue
+ queue = queueManager.getQueue(queuePath.getLeafQueue());
+
+ if (ifQueueDoesNotExist(queue)) {
+ //if leaf queue does not exist,
+ // this could be a potential auto created leaf queue
+ //validate if parent queue is specified,
+ // then it should exist and
+ // be an instance of AutoCreateEnabledParentQueue
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping, queuePath);
+ if (newMapping == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping
+ .getQueue());
+ }
+ newMappings.add(newMapping);
+ } else{
+ QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
+ queue, mapping, queuePath);
+ newMappings.add(newMapping);
+ }
+ } else{
+ // if queue exists, validate
+ // if its an instance of leaf queue
+ // if its an instance of auto created leaf queue,
+ // then extract parent queue name and update queue mapping
+ QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
+ queue, mapping, queuePath);
+ newMappings.add(newMapping);
+ }
+ } else{
+ //If it is a dynamic queue mapping,
+ // we can safely assume leaf queue name does not have '.' in it
+ // validate
+ // if parent queue is specified, then
+ // parent queue exists and an instance of AutoCreateEnabledParentQueue
+ //
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping, queuePath);
+ if (newMapping != null) {
+ newMappings.add(newMapping);
+ } else{
+ newMappings.add(mapping);
+ }
+ }
+ }
+
+ // initialize groups if mappings are present
+ if (newMappings.size() > 0) {
+ Groups groups = new Groups(conf);
+ return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+ newMappings, groups);
+ }
+
+ return null;
+ }
+
+ private static QueueMapping validateAndGetQueueMapping(
+ CapacitySchedulerQueueManager queueManager, CSQueue queue,
+ QueueMapping mapping, QueuePath queuePath) throws IOException {
+ if (!(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue : " + mapping.getQueue());
+ }
+
+ if (queue instanceof AutoCreatedLeafQueue && queue
+ .getParent() instanceof ManagedParentQueue) {
+
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping, queuePath);
+ if (newMapping == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping.getQueue());
+ }
+ return newMapping;
+ }
+ return mapping;
+ }
+
+ private static boolean ifQueueDoesNotExist(CSQueue queue) {
+ return queue == null;
+ }
+
+ private static QueueMapping validateAndGetAutoCreatedQueueMapping(
+ CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
+ QueuePath queuePath) throws IOException {
+ if (queuePath.hasParentQueue()) {
+ //if parent queue is specified,
+ // then it should exist and be an instance of ManagedParentQueue
+ validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()),
+ queuePath.getParentQueue(), queuePath.getLeafQueue());
+ return new QueueMapping(mapping.getType(), mapping.getSource(),
+ queuePath.getLeafQueue(), queuePath.getParentQueue());
+ }
+
+ return null;
+ }
+
+ private static boolean isStaticQueueMapping(QueueMapping mapping) {
+ return !mapping.getQueue().contains(
+ UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping
+ .getQueue().contains(
+ UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING);
+ }
+
+ private static class QueuePath {
+
+ public String parentQueue;
+ public String leafQueue;
+
+ public QueuePath(final String leafQueue) {
+ this.leafQueue = leafQueue;
+ }
+
+ public QueuePath(final String parentQueue, final String leafQueue) {
+ this.parentQueue = parentQueue;
+ this.leafQueue = leafQueue;
+ }
+
+ public String getParentQueue() {
+ return parentQueue;
+ }
+
+ public String getLeafQueue() {
+ return leafQueue;
+ }
+
+ public boolean hasParentQueue() {
+ return parentQueue != null;
+ }
+
+ @Override
+ public String toString() {
+ return parentQueue + DOT + leafQueue;
+ }
+ }
+
+ private static QueuePath extractQueuePath(String queueName)
+ throws IOException {
+ int parentQueueNameEndIndex = queueName.lastIndexOf(DOT);
+
+ if (parentQueueNameEndIndex > -1) {
+ final String parentQueue = queueName.substring(0, parentQueueNameEndIndex)
+ .trim();
+ final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1)
+ .trim();
+ return new QueuePath(parentQueue, leafQueue);
+ }
+
+ return new QueuePath(queueName);
+ }
+
+ private static void validateParentQueue(CSQueue parentQueue,
+ String parentQueueName, String leafQueueName) throws IOException {
+ if (parentQueue == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue [" + leafQueueName
+ + "] and invalid parent queue [" + parentQueueName + "]");
+ } else if (!(parentQueue instanceof ManagedParentQueue)) {
+ throw new IOException("mapping contains leaf queue [" + leafQueueName
+ + "] and invalid parent queue which "
+ + "does not have auto creation of leaf queues enabled ["
+ + parentQueueName + "]");
+ } else if (!parentQueue.getQueueName().equals(parentQueueName)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue [" + leafQueueName
+ + "] and invalid parent queue "
+ + "which does not match existing leaf queue's parent : ["
+ + parentQueueName + "] does not match [ " + parentQueue
+ .getQueueName() + "]");
+ }
+ }
+
@VisibleForTesting
public List getQueueMappings() {
return mappings;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index ae5f6b4..85d355f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -158,6 +161,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private boolean isNumAttemptsBeyondThreshold = false;
+
+
// Mutable fields
private long startTime;
private long finishTime = 0;
@@ -1073,38 +1078,51 @@ public class RMAppImpl implements RMApp, Recoverable {
app.getUser(),
BuilderUtils.parseTokensConf(app.submissionContext));
} catch (Exception e) {
- String msg = "Failed to fetch user credentials from application:"
- + e.getMessage();
+ String msg = "Failed to fetch user credentials from application:" + e
+ .getMessage();
app.diagnostics.append(msg);
LOG.error(msg, e);
}
}
- for (Map.Entry timeout :
- app.applicationTimeouts.entrySet()) {
+ for (Map.Entry timeout : app.applicationTimeouts
+ .entrySet()) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
timeout.getKey(), timeout.getValue());
if (LOG.isDebugEnabled()) {
long remainingTime = timeout.getValue() - app.systemClock.getTime();
LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type=" + timeout.getKey()
- + " remaining timeout="
- + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds");
+ + " remaining timeout=" + (remainingTime > 0 ?
+ remainingTime / 1000 :
+ 0) + " seconds");
}
}
+ ApplicationPlacementContext placementContext = null;
+ try {
+ placementContext = placeApplication(app.rmContext,
+ app.submissionContext, app.user);
+ } catch (Exception e) {
+ String msg = "Failed to place application to queue :" + e.getMessage();
+ app.diagnostics.append(msg);
+ LOG.error(msg, e);
+ }
+
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
- app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, false, app.applicationPriority));
+ app.scheduler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
+ app.applicationPriority, placementContext));
return RMAppState.SUBMITTED;
}
// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
- app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, true, app.applicationPriority));
+ app.scheduler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
+ app.applicationPriority, placementContext));
// recover attempts
app.recoverAppAttempts();
@@ -1120,8 +1138,20 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.handler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, false, app.applicationPriority));
+ ApplicationPlacementContext placementContext = null;
+ try {
+ placementContext = placeApplication(app.rmContext,
+ app.submissionContext, app.user);
+ replaceQueueFromPlacementContext(placementContext,
+ app.submissionContext);
+ } catch (YarnException e) {
+ String msg = "Failed to place application to queue :" + e.getMessage();
+ app.diagnostics.append(msg);
+ LOG.error(msg, e);
+ }
+ app.handler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
+ app.applicationPriority, placementContext));
// send the ATS create Event
app.sendATSCreateEvent();
}
@@ -2013,4 +2043,37 @@ public class RMAppImpl implements RMApp, Recoverable {
this.submissionContext.setAMContainerSpec(null);
this.submissionContext.setLogAggregationContext(null);
}
+
+ @VisibleForTesting
+ static ApplicationPlacementContext placeApplication(RMContext rmContext,
+ ApplicationSubmissionContext context, String user) throws YarnException {
+
+ ApplicationPlacementContext placementContext = null;
+ PlacementManager placementManager = rmContext.getQueuePlacementManager();
+
+ if (placementManager != null) {
+ placementContext = placementManager.placeApplication(context, user);
+ } else{
+ LOG.error(
+ "Queue Placement Manager is null. Cannot place application :" + " "
+ + context.getApplicationId() + " to queue ");
+ }
+
+ return placementContext;
+ }
+
+ static void replaceQueueFromPlacementContext(
+ ApplicationPlacementContext placementContext,
+ ApplicationSubmissionContext context) {
+ // Set it to ApplicationSubmissionContext
+ //apply queue mapping only to new application submissions
+ if (placementContext != null && !StringUtils.equals(context.getQueue(),
+ placementContext.getQueue())) {
+ LOG.info("Placed application=" + context.getApplicationId() + " to queue="
+ + placementContext.getQueue() + ", original queue=" + context
+ .getQueue());
+ context.setQueue(placementContext.getQueue());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 183cb36..74c85ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -78,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final String queueName;
private final String queuePath;
volatile int numContainers;
-
+
final Resource minimumAllocation;
volatile Resource maximumAllocation;
private volatile QueueState state = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
index b3d1b47..46f5cf1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -35,31 +35,13 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractManagedParentQueue.class);
- private int maxAppsForAutoCreatedQueues;
- private int maxAppsPerUserForAutoCreatedQueues;
- private int userLimit;
- private float userLimitFactor;
+ protected AutoCreatedLeafQueueTemplate leafQueueTemplate;
public AbstractManagedParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
super.setupQueueConfigs(csContext.getClusterResource());
- initializeLeafQueueConfigs();
-
- StringBuffer queueInfo = new StringBuffer();
- queueInfo.append("Created Managed Parent Queue: ").append(queueName)
- .append("\nof type : [" + getClass())
- .append("]\nwith capacity: [")
- .append(super.getCapacity()).append("]\nwith max capacity: [")
- .append(super.getMaximumCapacity()).append("\nwith max apps: [")
- .append(getMaxApplicationsForAutoCreatedQueues())
- .append("]\nwith max apps per user: [")
- .append(getMaxApplicationsPerUserForAutoCreatedQueues())
- .append("]\nwith user limit: [").append(getUserLimit())
- .append("]\nwith user limit factor: [")
- .append(getUserLimitFactor()).append("].");
- LOG.info(queueInfo.toString());
}
@Override
@@ -71,8 +53,6 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
// Set new configs
setupQueueConfigs(clusterResource);
- initializeLeafQueueConfigs();
-
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
@@ -87,72 +67,29 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
* Initialize leaf queue configs from template configurations specified on
* parent queue.
*/
- protected void initializeLeafQueueConfigs() {
+ protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
+ (String queuePath) {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
- final String queuePath = super.getQueuePath();
+ AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new
+ AutoCreatedLeafQueueTemplate.Builder();
int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
if (maxApps < 0) {
maxApps = (int) (
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
* getAbsoluteCapacity());
}
- userLimit = conf.getUserLimit(queuePath);
- userLimitFactor = conf.getUserLimitFactor(queuePath);
- maxAppsForAutoCreatedQueues = maxApps;
- maxAppsPerUserForAutoCreatedQueues =
- (int) (maxApps * (userLimit / 100.0f) * userLimitFactor);
-
- }
-
- /**
- * Number of maximum applications for each of the auto created leaf queues.
- *
- * @return maxAppsForAutoCreatedQueues
- */
- public int getMaxApplicationsForAutoCreatedQueues() {
- return maxAppsForAutoCreatedQueues;
- }
-
- /**
- * Number of maximum applications per user for each of the auto created
- * leaf queues.
- *
- * @return maxAppsPerUserForAutoCreatedQueues
- */
- public int getMaxApplicationsPerUserForAutoCreatedQueues() {
- return maxAppsPerUserForAutoCreatedQueues;
- }
-
- /**
- * User limit value for each of the auto created leaf queues.
- *
- * @return userLimit
- */
- public int getUserLimitForAutoCreatedQueues() {
- return userLimit;
- }
-
- /**
- * User limit factor value for each of the auto created leaf queues.
- *
- * @return userLimitFactor
- */
- public float getUserLimitFactor() {
- return userLimitFactor;
- }
- public int getMaxAppsForAutoCreatedQueues() {
- return maxAppsForAutoCreatedQueues;
- }
+ int userLimit = conf.getUserLimit(queuePath);
+ float userLimitFactor = conf.getUserLimitFactor(queuePath);
+ leafQueueTemplateBuilder.userLimit(userLimit)
+ .userLimitFactor(userLimitFactor)
+ .maxApps(maxApps)
+ .maxAppsPerUser(
+ (int) (maxApps * (userLimit / 100.0f) * userLimitFactor));
- public int getMaxAppsPerUserForAutoCreatedQueues() {
- return maxAppsPerUserForAutoCreatedQueues;
- }
-
- public int getUserLimit() {
- return userLimit;
+ return leafQueueTemplateBuilder;
}
/**
@@ -229,4 +166,111 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
return childQueue;
}
+
+ protected float sumOfChildCapacities() {
+ try {
+ writeLock.lock();
+ float ret = 0;
+ for (CSQueue l : childQueues) {
+ ret += l.getCapacity();
+ }
+ return ret;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected float sumOfChildAbsCapacities() {
+ try {
+ writeLock.lock();
+ float ret = 0;
+ for (CSQueue l : childQueues) {
+ ret += l.getAbsoluteCapacity();
+ }
+ return ret;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public static class AutoCreatedLeafQueueTemplate {
+
+ private QueueCapacities queueCapacities;
+
+ private int maxApps;
+ private int maxAppsPerUser;
+ private int userLimit;
+ private float userLimitFactor;
+
+ AutoCreatedLeafQueueTemplate(Builder builder) {
+ this.maxApps = builder.maxApps;
+ this.maxAppsPerUser = builder.maxAppsPerUser;
+ this.userLimit = builder.userLimit;
+ this.userLimitFactor = builder.userLimitFactor;
+ this.queueCapacities = builder.queueCapacities;
+ }
+
+ public static class Builder {
+ private int maxApps;
+ private int maxAppsPerUser;
+
+ private int userLimit;
+ private float userLimitFactor;
+
+ private QueueCapacities queueCapacities;
+
+ Builder maxApps(int maxApplications) {
+ this.maxApps = maxApplications;
+ return this;
+ }
+
+ Builder maxAppsPerUser(int maxApplicationsPerUser) {
+ this.maxAppsPerUser = maxApplicationsPerUser;
+ return this;
+ }
+
+ Builder userLimit(int usrLimit) {
+ this.userLimit = usrLimit;
+ return this;
+ }
+
+ Builder userLimitFactor(float ulf) {
+ this.userLimitFactor = ulf;
+ return this;
+ }
+
+ Builder capacities(QueueCapacities capacities) {
+ this.queueCapacities = capacities;
+ return this;
+ }
+
+ AutoCreatedLeafQueueTemplate build() {
+ return new AutoCreatedLeafQueueTemplate(this);
+ }
+ }
+
+ public int getUserLimit() {
+ return userLimit;
+ }
+
+ public float getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
+ public QueueCapacities getQueueCapacities() {
+ return queueCapacities;
+ }
+
+ public int getMaxApps() {
+ return maxApps;
+ }
+
+ public int getMaxAppsPerUser() {
+ return maxAppsPerUser;
+ }
+ }
+
+ public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() {
+ return leafQueueTemplate;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 4eb7cdd..bc206d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+ .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,17 +44,18 @@ public class AutoCreatedLeafQueue extends LeafQueue {
AbstractManagedParentQueue parent) throws IOException {
super(cs, queueName, parent, null);
- updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
- parent.getUserLimitFactor(),
- parent.getMaxApplicationsForAutoCreatedQueues(),
- parent.getMaxApplicationsPerUserForAutoCreatedQueues());
-
+ AutoCreatedLeafQueueTemplate leafQueueTemplate =
+ parent.getLeafQueueTemplate();
+ updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
+ leafQueueTemplate.getUserLimitFactor(),
+ leafQueueTemplate.getMaxApps(),
+ leafQueueTemplate.getMaxAppsPerUser());
this.parent = parent;
}
@Override
- public void reinitialize(CSQueue newlyParsedQueue,
- Resource clusterResource) throws IOException {
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
try {
writeLock.lock();
@@ -62,10 +65,12 @@ public class AutoCreatedLeafQueue extends LeafQueue {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
- updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
- parent.getUserLimitFactor(),
- parent.getMaxApplicationsForAutoCreatedQueues(),
- parent.getMaxApplicationsPerUserForAutoCreatedQueues());
+ AutoCreatedLeafQueueTemplate leafQueueTemplate =
+ parent.getLeafQueueTemplate();
+ updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
+ leafQueueTemplate.getUserLimitFactor(),
+ leafQueueTemplate.getMaxApps(),
+ leafQueueTemplate.getMaxAppsPerUser());
} finally {
writeLock.unlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6f630f8..ed30ad1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -64,10 +63,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@@ -146,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING;
+
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@@ -560,44 +561,17 @@ public class CapacityScheduler extends
}
@VisibleForTesting
- public UserGroupMappingPlacementRule
- getUserGroupMappingPlacementRule() throws IOException {
+ public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
try {
readLock.lock();
- boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
- LOG.info(
- "Initialized queue mappings, override: " + overrideWithQueueMappings);
-
- // Get new user/group mappings
- List newMappings = conf.getQueueMappings();
- // check if mappings refer to valid queues
- for (QueueMapping mapping : newMappings) {
- String mappingQueue = mapping.getQueue();
- if (!mappingQueue.equals(
- UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
- .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
- CSQueue queue = getQueue(mappingQueue);
- if (queue == null || !(queue instanceof LeafQueue)) {
- throw new IOException(
- "mapping contains invalid or non-leaf queue " + mappingQueue);
- }
- }
- }
-
- // initialize groups if mappings are present
- if (newMappings.size() > 0) {
- Groups groups = new Groups(conf);
- return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
- newMappings, groups);
- }
-
- return null;
+ return UserGroupMappingPlacementRule.get(this);
} finally {
readLock.unlock();
}
}
- private void updatePlacementRules() throws IOException {
+ @VisibleForTesting
+ void updatePlacementRules() throws IOException {
// Initialize placement rules
Collection placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES);
@@ -731,37 +705,92 @@ public class CapacityScheduler extends
}
}
- private void addApplication(ApplicationId applicationId,
- String queueName, String user, Priority priority) {
+ private void addApplication(ApplicationId applicationId, String queueName,
+ String user, Priority priority,
+ ApplicationPlacementContext placementContext) {
try {
writeLock.lock();
if (isSystemAppsLimitReached()) {
String message = "Maximum system application limit reached,"
+ "cannot accept submission of application: " + applicationId;
- this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
- applicationId, RMAppEventType.APP_REJECTED, message));
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
// Sanity checks.
CSQueue queue = getQueue(queueName);
+
+ if (queue == null && placementContext != null) {
+ //Could be a potential auto-created leaf queue
+ try {
+ queue = autoCreateLeafQueue(placementContext);
+ } catch (YarnException | IOException e) {
+ LOG.error("Could not auto-create leaf queue due to : ", e);
+ final String message =
+ "Application " + applicationId + " submission by user : " + user
+ + " to queue : " + queueName + " failed : " + e.getMessage();
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ }
+ }
+
if (queue == null) {
- String message =
+ final String message =
"Application " + applicationId + " submitted by user " + user
+ " to unknown queue: " + queueName;
+
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
}
+
if (!(queue instanceof LeafQueue)) {
String message =
- "Application " + applicationId + " submitted by user " + user
- + " to non-leaf queue: " + queueName;
+ "Application " + applicationId + " submitted by user : " + user
+ + " to non-leaf queue : " + queueName;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
+ } else if (queue instanceof AutoCreatedLeafQueue && queue
+ .getParent() instanceof ManagedParentQueue) {
+
+ //If queue already exists and auto-queue creation was not required,
+ //placement context should not be null
+ if (placementContext == null) {
+ String message =
+ "Application " + applicationId + " submission by user : " + user
+ + " to specified queue : " + queueName + " is prohibited. "
+ + "Verify automatic queue mapping for user exists in " +
+ QUEUE_MAPPING;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ // For a queue which exists already and
+ // not auto-created above, then its parent queue should match
+ // the parent queue specified in queue mapping
+ } else if (!queue.getParent().getQueueName().equals(
+ placementContext.getParentQueue())) {
+ String message =
+ "Auto created Leaf queue " + placementContext.getQueue() + " "
+ + "already exists under queue : " + queue
+ .getParent().getQueuePath()
+ + ".But Queue mapping configuration " +
+ CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been "
+ + "updated to a different parent queue : "
+ + placementContext.getParentQueue()
+ + " for the specified user : " + user;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ }
}
+
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);
@@ -1483,7 +1512,8 @@ public class CapacityScheduler extends
if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName,
- appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
+ appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
+ appAddedEvent.getPlacementContext());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
@@ -2001,7 +2031,8 @@ public class CapacityScheduler extends
try {
writeLock.lock();
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
- ParentQueue parent = (ParentQueue) queue.getParent();
+ AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue
+ .getParent();
if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
@@ -2010,7 +2041,8 @@ public class CapacityScheduler extends
}
if (parent == null
- || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) {
+ || !(AbstractManagedParentQueue.class.isAssignableFrom(
+ parent.getClass()))) {
throw new SchedulerDynamicEditException(
"The parent of AutoCreatedLeafQueue " + inQueue
+ " must be a PlanQueue/ManagedParentQueue");
@@ -2655,4 +2687,43 @@ public class CapacityScheduler extends
}
return null;
}
+
+ private LeafQueue autoCreateLeafQueue(
+ ApplicationPlacementContext placementContext)
+ throws IOException, YarnException {
+
+ AutoCreatedLeafQueue autoCreatedLeafQueue = null;
+
+ String leafQueueName = placementContext.getQueue();
+ String parentQueueName = placementContext.getParentQueue();
+
+ if (!StringUtils.isEmpty(parentQueueName)) {
+ CSQueue parentQueue = getQueue(parentQueueName);
+
+ if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(
+ parentQueue.getQueuePath())) {
+
+ ManagedParentQueue autoCreateEnabledParentQueue =
+ (ManagedParentQueue) parentQueue;
+ autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
+ autoCreateEnabledParentQueue);
+
+ addQueue(autoCreatedLeafQueue);
+
+ //TODO - Set entitlement through capacity management policy
+ } else{
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName
+ + ". Queue mapping specifies an invalid parent queue "
+ + "which does not exist "
+ + parentQueueName);
+ }
+ } else{
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName
+ + ". Queue mapping does not specify"
+ + " which parent queue it needs to be created under.");
+ }
+ return autoCreatedLeafQueue;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index bfead35..4515453 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -907,6 +907,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
}
+ @Private
+ @VisibleForTesting
+ public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
+ setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
+ }
+
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value
@@ -981,6 +987,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return mappings;
}
+ @Private
+ @VisibleForTesting
+ public void setQueuePlacementRules(Collection queuePlacementRules) {
+ if (queuePlacementRules == null) {
+ return;
+ }
+ String str = StringUtils.join(",", queuePlacementRules);
+ setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setQueueMappings(List queueMappings) {
+ if (queueMappings == null) {
+ return;
+ }
+
+ List queueMappingStrs = new ArrayList<>();
+ for (QueueMapping mapping : queueMappings) {
+ queueMappingStrs.add(mapping.toString());
+ }
+
+ setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
+ }
+
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
@@ -1523,4 +1554,126 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
}
+
+ @Private
+ public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
+
+ @Private
+ public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
+ "auto-create-child-queue.enabled";
+
+ @Private
+ public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
+ "leaf-queue-template";
+
+ @Private
+ public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
+ "auto-create-child-queue.max-queues";
+
+ @Private
+ public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
+
+ /**
+ * If true, this queue will be created as a Parent Queue which Auto Created
+ * leaf child queues
+ *
+ * @param queuePath The queues path
+ * @return true if auto create is enabled for child queues else false. Default
+ * is false
+ */
+ @Private
+ public boolean isAutoCreateChildQueueEnabled(String queuePath) {
+ boolean isAutoCreateEnabled = getBoolean(
+ getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED,
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
+ return isAutoCreateEnabled;
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setAutoCreateChildQueueEnabled(String queuePath,
+ boolean autoCreationEnabled) {
+ setBoolean(getQueuePrefix(queuePath) +
+ AUTO_CREATE_CHILD_QUEUE_ENABLED,
+ autoCreationEnabled);
+ }
+
+ /**
+ * Get the auto created leaf queue's template configuration prefix
+ * Leaf queue's template capacities are configured at the parent queue
+ *
+ * @param queuePath parent queue's path
+ * @return Config prefix for leaf queue template configurations
+ */
+ @Private
+ public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
+ return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
+ }
+
+ @Private
+ public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
+ "auto-create-child-queue.fail-on-exceeding-parent-capacity";
+
+ @Private
+ public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
+ false;
+
+ /**
+ * Fail further auto leaf queue creation when parent's guaranteed capacity is
+ * exceeded.
+ *
+ * @param queuePath the parent queue's path
+ * @return true if configured to fail else false
+ */
+ @Private
+ public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+ String queuePath) {
+ boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
+ getBoolean(getQueuePrefix(queuePath)
+ + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
+ DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
+ return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
+ }
+
+ @VisibleForTesting
+ @Private
+ public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+ String queuePath, boolean autoCreationEnabled) {
+ setBoolean(
+ getQueuePrefix(queuePath) +
+ FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
+ autoCreationEnabled);
+ }
+
+ /**
+ * Get the max number of leaf queues that are allowed to be created under
+ * a parent queue
+ *
+ * @param queuePath the paret queue's path
+ * @return the max number of leaf queues allowed to be auto created
+ */
+ @Private
+ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
+ return getInt(getQueuePrefix(queuePath) +
+ AUTO_CREATE_QUEUE_MAX_QUEUES,
+ DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath,
+ float val) {
+ String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+ queuePath);
+ setCapacity(leafQueueConfPrefix, val);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
+ float val) {
+ String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+ queuePath);
+ setMaximumCapacity(leafQueueConfPrefix, val);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 7be2529..eb50123 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -154,7 +154,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
* @throws IOException if fails to initialize queues
*/
public void initializeQueues(CapacitySchedulerConfiguration conf)
- throws IOException {
+ throws IOException {
root = parseQueue(this.csContext, conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, appPriorityACLManager, queues);
@@ -176,7 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
if (!csContext.isConfigurationMutable() ||
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
- // Ensure queue hiearchy in the new XML file is proper.
+ // Ensure queue hierarchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues);
}
@@ -216,11 +216,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
Map oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
- String fullQueueName =
- (parent == null) ? queueName
- : (parent.getQueuePath() + "." + queueName);
+ String fullQueueName = (parent == null) ?
+ queueName :
+ (parent.getQueuePath() + "." + queueName);
String[] childQueueNames = conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName);
+ boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
+ fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
@@ -229,9 +231,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) {
- queue =
- new PlanQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
+ queue = new PlanQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
//initializing the "internal" default queue, for SLS compatibility
String defReservationId =
@@ -249,38 +250,46 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue);
- } else {
- queue =
- new LeafQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
+ } else if (isAutoCreateEnabled) {
+ queue = new ManagedParentQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+ } else{
+ queue = new LeafQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
}
- } else {
+ } else{
if (isReservableQueue) {
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
- ParentQueue parentQueue =
- new ParentQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
+
+ ParentQueue parentQueue;
+ if (isAutoCreateEnabled) {
+ parentQueue = new ManagedParentQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+ } else{
+ parentQueue = new ParentQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+ }
// Used only for unit tests
queue = hook.hook(parentQueue);
List childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
- CSQueue childQueue =
- parseQueue(csContext, conf, queue, childQueueName,
- queues, oldQueues, hook);
+ CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
+ queues, oldQueues, hook);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
+
}
- if (queue instanceof LeafQueue && queues.containsKey(queueName)
- && queues.get(queueName) instanceof LeafQueue) {
+ if (queue instanceof LeafQueue && queues.containsKey(queueName) && queues
+ .get(queueName) instanceof LeafQueue) {
throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct");
}
@@ -312,27 +321,46 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml");
- } else {
+ } else{
throw new IOException(oldQueue.getQueuePath() + " is deleted from"
+ " the new capacity scheduler configuration, but the"
- + " queue is not yet in stopped state. "
- + "Current State : " + oldQueue.getState());
+ + " queue is not yet in stopped state. " + "Current State : "
+ + oldQueue.getState());
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
- throw new IOException(queueName + " is moved from:"
- + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
- + " after refresh, which is not allowed.");
- } else if (oldQueue instanceof LeafQueue
+ throw new IOException(
+ queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
+ + newQueue.getQueuePath()
+ + " after refresh, which is not allowed.");
+ } else if (oldQueue instanceof ParentQueue
+ && !(oldQueue instanceof ManagedParentQueue)
+ && newQueue instanceof ManagedParentQueue) {
+ throw new IOException(
+ "Can not convert parent queue: " + oldQueue.getQueuePath()
+ + " to auto create enabled parent queue since "
+ + "it could have other pre-configured queues which is not "
+ + "supported");
+ } else if (oldQueue instanceof ManagedParentQueue
+ && !(newQueue instanceof ManagedParentQueue)) {
+ throw new IOException(
+ "Cannot convert auto create enabled parent queue: " + oldQueue
+ .getQueuePath() + " to leaf queue. Please check "
+ + " parent queue's configuration "
+ + CapacitySchedulerConfiguration
+ .AUTO_CREATE_CHILD_QUEUE_ENABLED
+ + " is set to true");
+ } else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue.");
- } else {
- throw new IOException("Can not convert the leaf queue: "
- + oldQueue.getQueuePath() + " to parent queue since "
- + "it is not yet in stopped state. Current State : "
- + oldQueue.getState());
+ } else{
+ throw new IOException(
+ "Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ + " to parent queue since "
+ + "it is not yet in stopped state. Current State : "
+ + oldQueue.getState());
}
} else if (oldQueue instanceof ParentQueue
&& newQueue instanceof LeafQueue) {
@@ -352,6 +380,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
*/
private void updateQueues(Map existingQueues,
Map newQueues) {
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
for (Map.Entry e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
@@ -363,7 +392,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
.iterator(); itr.hasNext();) {
Map.Entry e = itr.next();
String queueName = e.getKey();
- if (!newQueues.containsKey(queueName)) {
+ CSQueue existingQueue = e.getValue();
+
+ //TODO - Handle case when auto create is disabled on parent queues
+ if (!newQueues.containsKey(queueName) && !(
+ existingQueue instanceof AutoCreatedLeafQueue && conf
+ .isAutoCreateChildQueueEnabled(
+ existingQueue.getParent().getQueuePath()))) {
itr.remove();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
new file mode 100644
index 0000000..ff795e4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Auto Creation enabled Parent queue. This queue initially does not have any
+ * children to start with and all child
+ * leaf queues will be auto created. Currently this does not allow other
+ * pre-configured leaf or parent queues to
+ * co-exist along with auto-created leaf queues. The auto creation is limited
+ * to leaf queues currently.
+ */
+public class ManagedParentQueue extends AbstractManagedParentQueue {
+
+ private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ManagedParentQueue.class);
+
+ public ManagedParentQueue(final CapacitySchedulerContext cs,
+ final String queueName, final CSQueue parent, final CSQueue old)
+ throws IOException {
+ super(cs, queueName, parent, old);
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+ csContext.getConfiguration());
+ this.leafQueueTemplate = initializeLeafQueueConfigs(
+ leafQueueTemplateConfPrefix).build();
+
+ StringBuffer queueInfo = new StringBuffer();
+ queueInfo.append("Created Managed Parent Queue: ").append(queueName).append(
+ "]\nwith capacity: [").append(super.getCapacity()).append(
+ "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+ "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+ "]\nwith max apps per user: [").append(
+ leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+ .append(leafQueueTemplate.getUserLimit()).append(
+ "]\nwith user limit factor: [").append(
+ leafQueueTemplate.getUserLimitFactor()).append("].");
+ LOG.info(queueInfo.toString());
+ }
+
+ @Override
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ validate(newlyParsedQueue);
+ super.reinitialize(newlyParsedQueue, clusterResource);
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+ csContext.getConfiguration());
+ this.leafQueueTemplate = initializeLeafQueueConfigs(
+ leafQueueTemplateConfPrefix).build();
+ }
+
+ @Override
+ protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs(
+ String queuePath) {
+
+ AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate =
+ super.initializeLeafQueueConfigs(queuePath);
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf);
+ QueueCapacities queueCapacities = new QueueCapacities(false);
+ CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix,
+ csContext.getConfiguration(), queueCapacities, getQueueCapacities());
+ leafQueueTemplate.capacities(queueCapacities);
+
+ shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
+ conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+ getQueuePath());
+
+ return leafQueueTemplate;
+ }
+
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
+ // Sanity check
+ if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
+ }
+
+ @Override
+ public void addChildQueue(CSQueue childQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+
+ if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
+ throw new SchedulerDynamicEditException(
+ "Expected child queue to be an instance of AutoCreatedLeafQueue");
+ }
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ ManagedParentQueue parentQueue =
+ (ManagedParentQueue) childQueue.getParent();
+
+ String leafQueueName = childQueue.getQueueName();
+ int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
+ parentQueue.getQueuePath());
+
+ if (parentQueue.getChildQueues().size() >= maxQueues) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ".Max Child "
+ + "Queue limit exceeded which is configured as : " + maxQueues
+ + " and number of child queues is : " + parentQueue
+ .getChildQueues().size());
+ }
+
+ if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
+ if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+ + parentQueue.sumOfChildAbsCapacities() > parentQueue
+ .getAbsoluteCapacity()) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ". Child "
+ + "queues capacities have reached parent queue : "
+ + parentQueue.getQueuePath() + " guaranteed capacity");
+ }
+ }
+
+ AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+ super.addChildQueue(leafQueue);
+ //TODO - refresh policy queue after capacity management is added
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
+ return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index d61951b..959ca51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -1081,17 +1081,4 @@ public class ParentQueue extends AbstractCSQueue {
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
-
- protected float sumOfChildCapacities() {
- try {
- writeLock.lock();
- float ret = 0;
- for (CSQueue l : childQueues) {
- ret += l.getCapacity();
- }
- return ret;
- } finally {
- writeLock.unlock();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 4ab2e9f..b7f8aa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue {
public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
+ this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
+
+ StringBuffer queueInfo = new StringBuffer();
+ queueInfo.append("Created Plan Queue: ").append(queueName).append(
+ "]\nwith capacity: [").append(super.getCapacity()).append(
+ "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+ "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+ "]\nwith max apps per user: [").append(
+ leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+ .append(leafQueueTemplate.getUserLimit()).append(
+ "]\nwith user limit factor: [").append(
+ leafQueueTemplate.getUserLimitFactor()).append("].");
+ LOG.info(queueInfo.toString());
}
@Override
@@ -47,17 +60,21 @@ public class PlanQueue extends AbstractManagedParentQueue {
throws IOException {
validate(newlyParsedQueue);
super.reinitialize(newlyParsedQueue, clusterResource);
+ this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
}
@Override
- protected void initializeLeafQueueConfigs() {
- String queuePath = super.getQueuePath();
+ protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
+ (String queuePath) {
+ AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super
+ .initializeLeafQueueConfigs
+ (queuePath);
showReservationsAsQueues = csContext.getConfiguration()
.getShowReservationAsQueues(queuePath);
- super.initializeLeafQueueConfigs();
+ return leafQueueTemplate;
}
- private void validate(final CSQueue newlyParsedQueue) throws IOException {
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org