Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C321F200C3C for ; Mon, 3 Apr 2017 19:17:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C1B49160B8F; Mon, 3 Apr 2017 17:17:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA0DC160B8D for ; Mon, 3 Apr 2017 19:17:25 +0200 (CEST) Received: (qmail 78948 invoked by uid 500); 3 Apr 2017 17:17:25 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 78934 invoked by uid 99); 3 Apr 2017 17:17:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Apr 2017 17:17:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9515DFC8E; Mon, 3 Apr 2017 17:17:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-5952. Create REST API for changing YARN scheduler configurations. (Jonathan Hung via wangda) [Forced Update!] Date: Mon, 3 Apr 2017 17:17:24 +0000 (UTC) archived-at: Mon, 03 Apr 2017 17:17:27 -0000 Repository: hadoop Updated Branches: refs/heads/YARN-5734 663f30a25 -> 94f81b6f6 (forced update) YARN-5952. Create REST API for changing YARN scheduler configurations. (Jonathan Hung via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94f81b6f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94f81b6f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94f81b6f Branch: refs/heads/YARN-5734 Commit: 94f81b6f6357573d8a2b3efa35ea69711ca6fec0 Parents: 25d2028 Author: Wangda Tan Authored: Mon Apr 3 10:12:01 2017 -0700 Committer: Wangda Tan Committed: Mon Apr 3 10:12:01 2017 -0700 ---------------------------------------------------------------------- .../scheduler/MutableConfScheduler.java | 40 ++ .../scheduler/MutableConfigurationProvider.java | 5 +- .../scheduler/capacity/CapacityScheduler.java | 16 +- .../conf/InMemoryConfigurationStore.java | 6 +- .../conf/MutableCSConfigurationProvider.java | 24 +- .../resourcemanager/webapp/RMWebServices.java | 169 +++++++ .../webapp/dao/QueueConfigInfo.java | 57 +++ .../webapp/dao/QueueConfigsUpdateInfo.java | 60 +++ .../TestMutableCSConfigurationProvider.java | 6 +- .../TestRMWebServicesConfigurationMutation.java | 477 +++++++++++++++++++ 10 files changed, 849 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java new file mode 100644 index 0000000..35e36e1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Map; + +/** + * Interface for a scheduler that supports changing configuration at runtime. + * + */ +public interface MutableConfScheduler extends ResourceScheduler { + + /** + * Update the scheduler's configuration. + * @param user Caller of this update + * @param confUpdate key-value map of the configuration update + * @throws IOException if update is invalid + */ + void updateConfiguration(UserGroupInformation user, + Map confUpdate) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index da30a2b..889c3bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.Map; /** @@ -29,7 +30,9 @@ public interface MutableConfigurationProvider { * Update the scheduler configuration with the provided key value pairs. * @param user User issuing the request * @param confUpdate Key-value pairs for configurations to be updated. + * @throws IOException if scheduler could not be reinitialized */ - void mutateConfiguration(String user, Map confUpdate); + void mutateConfiguration(String user, Map confUpdate) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7362688..e027f5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -149,7 +151,7 @@ import com.google.common.util.concurrent.SettableFuture; public class CapacityScheduler extends AbstractYarnScheduler implements PreemptableResourceScheduler, CapacitySchedulerContext, Configurable, - ResourceAllocationCommitter { + ResourceAllocationCommitter, MutableConfScheduler { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); @@ -2486,4 +2488,16 @@ public class CapacityScheduler extends writeLock.unlock(); } } + + @Override + public void updateConfiguration(UserGroupInformation user, + Map confUpdate) throws IOException { + if (csConfProvider instanceof MutableConfigurationProvider) { + ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( + user.getShortUserName(), confUpdate); + } else { + throw new UnsupportedOperationException("Configured CS configuration " + + "provider does not support updating configuration."); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index a208fb9..b97be1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -58,7 +58,11 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore { if (isValid) { Map mutations = mutation.getUpdates(); for (Map.Entry kv : mutations.entrySet()) { - schedConf.set(kv.getKey(), kv.getValue()); + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); + } } } return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 267ab6a..ea1b3c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -60,34 +60,44 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } Configuration initialSchedConf = new Configuration(false); initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); - this.schedConf = initialSchedConf; - confStore.initialize(config, initialSchedConf); + this.schedConf = new Configuration(false); + // We need to explicitly set the key-values in schedConf, otherwise + // these configuration keys cannot be deleted when + // configuration is reloaded. + for (Map.Entry kv : initialSchedConf) { + schedConf.set(kv.getKey(), kv.getValue()); + } + confStore.initialize(config, schedConf); this.conf = config; } @Override public CapacitySchedulerConfiguration loadConfiguration(Configuration configuration) throws IOException { - Configuration loadedConf = new Configuration(configuration); - loadedConf.addResource(schedConf); + Configuration loadedConf = new Configuration(schedConf); + loadedConf.addResource(configuration); return new CapacitySchedulerConfiguration(loadedConf, false); } @Override public void mutateConfiguration(String user, - Map confUpdate) { + Map confUpdate) throws IOException { Configuration oldConf = new Configuration(schedConf); LogMutation log = new LogMutation(confUpdate, user); long id = confStore.logMutation(log); for (Map.Entry kv : confUpdate.entrySet()) { - schedConf.set(kv.getKey(), kv.getValue()); + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); + } } try { rmContext.getScheduler().reinitialize(conf, rmContext); } catch (IOException e) { schedConf = oldConf; confStore.confirmMutation(id, false); - return; + throw e; } confStore.confirmMutation(id, true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index bd0602b..78eb4de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -57,6 +57,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import com.google.common.base.Joiner; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -136,11 +137,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2614,4 +2617,170 @@ public class RMWebServices extends WebServices { app.getApplicationTimeouts().get(appTimeout.getTimeoutType())); return Response.status(Status.OK).entity(timeout).build(); } + + @PUT + @Path("/queues") + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo + mutationInfo, @Context HttpServletRequest hsr) + throws AuthorizationException, InterruptedException { + init(); + + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + ApplicationACLsManager aclsManager = rm.getApplicationACLsManager(); + if (aclsManager.areACLsEnabled()) { + if (callerUGI == null || !aclsManager.isAdmin(callerUGI)) { + String msg = "Only admins can carry out this operation."; + throw new ForbiddenException(msg); + } + } + + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof MutableConfScheduler) { + try { + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, YarnException { + Map confUpdate = + constructKeyValueConfUpdate(mutationInfo); + ((CapacityScheduler) scheduler).updateConfiguration(callerUGI, + confUpdate); + return null; + } + }); + } catch (IOException e) { + return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) + .build(); + } + return Response.status(Status.OK).entity("Configuration change " + + "successfully applied.").build(); + } else { + return Response.status(Status.BAD_REQUEST) + .entity("Configuration change only supported by CapacityScheduler.") + .build(); + } + } + + private Map constructKeyValueConfUpdate( + QueueConfigsUpdateInfo mutationInfo) throws IOException { + CapacitySchedulerConfiguration currentConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(currentConf, false); + Map confUpdate = new HashMap<>(); + for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { + removeQueue(queueToRemove, proposedConf, confUpdate); + } + for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { + addQueue(addQueueInfo, proposedConf, confUpdate); + } + for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { + updateQueue(updateQueueInfo, proposedConf, confUpdate); + } + return confUpdate; + } + + private void removeQueue( + String queueToRemove, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (queueToRemove == null) { + return; + } else { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + String queueName = queueToRemove.substring( + queueToRemove.lastIndexOf('.') + 1); + CSQueue queue = cs.getQueue(queueName); + if (queue == null || + !queue.getQueuePath().equals(queueToRemove)) { + throw new IOException("Queue " + queueToRemove + " not found"); + } else if (queueToRemove.lastIndexOf('.') == -1) { + throw new IOException("Can't remove queue " + queueToRemove); + } + String parentQueuePath = queueToRemove.substring(0, queueToRemove + .lastIndexOf('.')); + String[] siblingQueues = proposedConf.getQueues(parentQueuePath); + List newSiblingQueues = new ArrayList<>(); + for (String siblingQueue : siblingQueues) { + if (!siblingQueue.equals(queueName)) { + newSiblingQueues.add(siblingQueue); + } + } + proposedConf.setQueues(parentQueuePath, newSiblingQueues + .toArray(new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (newSiblingQueues.size() == 0) { + confUpdate.put(queuesConfig, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } + } + } + + private void addQueue( + QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (addInfo == null) { + return; + } else { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + String queuePath = addInfo.getQueue(); + String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); + if (cs.getQueue(queueName) != null) { + throw new IOException("Can't add existing queue " + queuePath); + } else if (queuePath.lastIndexOf('.') == -1) { + throw new IOException("Can't add invalid queue " + queuePath); + } + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String[] siblings = proposedConf.getQueues(parentQueue); + List siblingQueues = siblings == null ? new ArrayList<>() : + new ArrayList<>(Arrays.asList(siblings)); + siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); + proposedConf.setQueues(parentQueue, + siblingQueues.toArray(new String[0])); + confUpdate.put(CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES, + Joiner.on(',').join(siblingQueues)); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : addInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + } + } + + private void updateQueue(QueueConfigInfo updateInfo, + CapacitySchedulerConfiguration proposedConf, + Map confUpdate) { + if (updateInfo == null) { + return; + } else { + String queuePath = updateInfo.getQueue(); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : updateInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java new file mode 100644 index 0000000..b20eda6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.HashMap; +import java.util.Map; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Information for adding or updating a queue to scheduler configuration + * for this queue. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class QueueConfigInfo { + + @XmlElement(name = "queueName") + private String queue; + + private HashMap params = new HashMap<>(); + + public QueueConfigInfo() { } + + public QueueConfigInfo(String queue, Map params) { + this.queue = queue; + this.params = new HashMap<>(params); + } + + public String getQueue() { + return this.queue; + } + + public HashMap getParams() { + return this.params; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java new file mode 100644 index 0000000..644ec90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Information for making scheduler configuration changes (supports adding, + * removing, or updating a queue). + */ +@XmlRootElement(name = "schedConf") +@XmlAccessorType(XmlAccessType.FIELD) +public class QueueConfigsUpdateInfo { + + @XmlElement(name = "add") + private ArrayList addQueueInfo = new ArrayList<>(); + + @XmlElement(name = "remove") + private ArrayList removeQueueInfo = new ArrayList<>(); + + @XmlElement(name = "update") + private ArrayList updateQueueInfo = new ArrayList<>(); + + public QueueConfigsUpdateInfo() { + // JAXB needs this + } + + public ArrayList getAddQueueInfo() { + return addQueueInfo; + } + + public ArrayList getRemoveQueueInfo() { + return removeQueueInfo; + } + + public ArrayList getUpdateQueueInfo() { + return updateQueueInfo; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 3f103b1..254da31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -77,7 +77,11 @@ public class TestMutableCSConfigurationProvider { assertNull(confProvider.loadConfiguration(conf).get("badKey")); doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class), any(RMContext.class)); - confProvider.mutateConfiguration(TEST_USER, badUpdate); + try { + confProvider.mutateConfiguration(TEST_USER, badUpdate); + } catch (IOException e) { + // Expected exception. + } assertNull(confProvider.loadConfiguration(conf).get("badKey")); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/94f81b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java new file mode 100644 index 0000000..d149055 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.google.inject.Guice; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Test scheduler configuration mutation via REST API. + */ +public class TestRMWebServicesConfigurationMutation extends JerseyTestBase { + + private static final File CONF_FILE = new File(new File("target", + "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE); + private static final File OLD_CONF_FILE = new File(new File("target", + "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp"); + + private static MockRM rm; + private static String userName; + private static CapacitySchedulerConfiguration csConf; + private static YarnConfiguration conf; + + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(RMWebServices.class); + bind(GenericExceptionHandler.class); + try { + userName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException ioe) { + throw new RuntimeException("Unable to get current user name " + + ioe.getMessage(), ioe); + } + csConf = new CapacitySchedulerConfiguration(new Configuration(false), + false); + setupQueueConfiguration(csConf); + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName); + try { + if (CONF_FILE.exists()) { + if (!CONF_FILE.renameTo(OLD_CONF_FILE)) { + throw new RuntimeException("Failed to rename conf file"); + } + } + FileOutputStream out = new FileOutputStream(CONF_FILE); + csConf.writeXml(out); + out.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to write XML file", e); + } + rm = new MockRM(conf); + bind(ResourceManager.class).toInstance(rm); + serve("/*").with(GuiceContainer.class); + filter("/*").through(TestRMWebServicesAppsModification + .TestRMCustomAuthFilter.class); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + private static void setupQueueConfiguration( + CapacitySchedulerConfiguration config) { + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b", "c"}); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + config.setCapacity(a, 25f); + config.setMaximumCapacity(a, 50f); + + final String a1 = a + ".a1"; + final String a2 = a + ".a2"; + config.setQueues(a, new String[]{"a1", "a2"}); + config.setCapacity(a1, 100f); + config.setCapacity(a2, 0f); + + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + config.setCapacity(b, 75f); + + final String c = CapacitySchedulerConfiguration.ROOT + ".c"; + config.setCapacity(c, 0f); + + final String c1 = c + ".c1"; + config.setQueues(c, new String[] {"c1"}); + config.setCapacity(c1, 0f); + } + + public TestRMWebServicesConfigurationMutation() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.resourcemanager.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + @Test + public void testAddNestedQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Add parent queue root.d with two children d1 and d2. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + Map d1Capacity = new HashMap<>(); + d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25"); + d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25"); + Map nearEmptyCapacity = new HashMap<>(); + nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4"); + nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, + "1E-4"); + Map d2Capacity = new HashMap<>(); + d2Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "75"); + d2Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "75"); + QueueConfigInfo d1 = new QueueConfigInfo("root.d.d1", d1Capacity); + QueueConfigInfo d2 = new QueueConfigInfo("root.d.d2", d2Capacity); + QueueConfigInfo d = new QueueConfigInfo("root.d", nearEmptyCapacity); + updateInfo.getAddQueueInfo().add(d1); + updateInfo.getAddQueueInfo().add(d2); + updateInfo.getAddQueueInfo().add(d); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(2, newCSConf.getQueues("root.d").length); + assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d1"), + 0.01f); + assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d2"), + 0.01f); + } + + @Test + public void testAddWithUpdate() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Add root.d with capacity 25, reducing root.b capacity from 75 to 50. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + Map dCapacity = new HashMap<>(); + dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25"); + Map bCapacity = new HashMap<>(); + bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50"); + QueueConfigInfo d = new QueueConfigInfo("root.d", dCapacity); + QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity); + updateInfo.getAddQueueInfo().add(d); + updateInfo.getUpdateQueueInfo().add(b); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f); + } + + @Test + public void testRemoveQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.a.a2"); + // Remove root.a.a2 + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.a.a2"); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(1, newCSConf.getQueues("root.a").length); + assertEquals("a1", newCSConf.getQueues("root.a")[0]); + } + + @Test + public void testRemoveParentQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.c", "root.c.c1"); + // Remove root.c (parent queue) + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.c"); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(2, newCSConf.getQueues("root").length); + assertNull(newCSConf.getQueues("root.c")); + } + + @Test + public void testRemoveParentQueueWithCapacity() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.a", "root.a.a1", "root.a.a2"); + // Remove root.a (parent queue) with capacity 25 + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.a"); + + // Set root.b capacity to 100 + Map bCapacity = new HashMap<>(); + bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100"); + QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity); + updateInfo.getUpdateQueueInfo().add(b); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(2, newCSConf.getQueues("root").length); + assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), + 0.01f); + } + + @Test + public void testRemoveMultipleQueues() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.b", "root.c", "root.c.c1"); + // Remove root.b and root.c + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.b"); + updateInfo.getRemoveQueueInfo().add("root.c"); + Map aCapacity = new HashMap<>(); + aCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100"); + aCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100"); + QueueConfigInfo configInfo = new QueueConfigInfo("root.a", aCapacity); + updateInfo.getUpdateQueueInfo().add(configInfo); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(1, newCSConf.getQueues("root").length); + } + + private void stopQueue(String... queuePaths) throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Set state of queues to STOPPED. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + Map stoppedParam = new HashMap<>(); + stoppedParam.put(CapacitySchedulerConfiguration.STATE, + QueueState.STOPPED.toString()); + for (String queue : queuePaths) { + QueueConfigInfo stoppedInfo = new QueueConfigInfo(queue, stoppedParam); + updateInfo.getUpdateQueueInfo().add(stoppedInfo); + } + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + for (String queue : queuePaths) { + assertEquals(QueueState.STOPPED, newCSConf.getState(queue)); + } + } + + @Test + public void testUpdateQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Update config value. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + Map updateParam = new HashMap<>(); + updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX, + "0.2"); + QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam); + updateInfo.getUpdateQueueInfo().add(aUpdateInfo); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + assertEquals(CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, + cs.getConfiguration() + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), + 0.001f); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = cs.getConfiguration(); + assertEquals(0.2f, newCSConf + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), 0.001f); + + // Remove config. Config value should be reverted to default. + updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX, + null); + aUpdateInfo = new QueueConfigInfo("root.a", updateParam); + updateInfo.getUpdateQueueInfo().clear(); + updateInfo.getUpdateQueueInfo().add(aUpdateInfo); + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + newCSConf = cs.getConfiguration(); + assertEquals(CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), + 0.001f); + } + + @Test + public void testUpdateQueueCapacity() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Update root.a and root.b capacity to 50. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + Map updateParam = new HashMap<>(); + updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50"); + QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam); + QueueConfigInfo bUpdateInfo = new QueueConfigInfo("root.b", updateParam); + updateInfo.getUpdateQueueInfo().add(aUpdateInfo); + updateInfo.getUpdateQueueInfo().add(bUpdateInfo); + + response = + r.path("ws").path("v1").path("cluster") + .path("queues").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f); + } + + @Override + @After + public void tearDown() throws Exception { + if (rm != null) { + rm.stop(); + } + CONF_FILE.delete(); + if (!OLD_CONF_FILE.renameTo(CONF_FILE)) { + throw new RuntimeException("Failed to re-copy old configuration file"); + } + super.tearDown(); + } + + @SuppressWarnings("rawtypes") + private String toJson(Object nsli, Class klass) throws Exception { + StringWriter sw = new StringWriter(); + JSONJAXBContext ctx = new JSONJAXBContext(klass); + JSONMarshaller jm = ctx.createJSONMarshaller(); + jm.marshallToJSON(nsli, sw); + return sw.toString(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org