Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B5BEF200D22 for ; Sat, 7 Oct 2017 02:06:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B43A3160BDE; Sat, 7 Oct 2017 00:06:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 347D01609E1 for ; Sat, 7 Oct 2017 02:06:20 +0200 (CEST) Received: (qmail 99754 invoked by uid 500); 7 Oct 2017 00:06:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 98280 invoked by uid 99); 7 Oct 2017 00:06:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Oct 2017 00:06:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB342F5D0B; Sat, 7 Oct 2017 00:06:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhung@apache.org To: common-commits@hadoop.apache.org Date: Sat, 07 Oct 2017 00:06:51 -0000 Message-Id: In-Reply-To: <93066f4316f9400dbe0da147bb752bf8@git.apache.org> References: <93066f4316f9400dbe0da147bb752bf8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] hadoop git commit: YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda) archived-at: Sat, 07 Oct 2017 00:06:22 -0000 YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda) Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79fb0a58 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79fb0a58 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79fb0a58 Branch: refs/heads/YARN-5734 Commit: 79fb0a5841bd10e32b1bbbade1a09e38514b2afc Parents: 33f1074 Author: Wangda Tan Authored: Mon Sep 18 09:53:42 2017 -0700 Committer: Jonathan Hung Committed: Fri Oct 6 16:54:53 2017 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 14 +- .../src/main/resources/yarn-default.xml | 15 +- .../server/resourcemanager/AdminService.java | 18 +- .../server/resourcemanager/ResourceManager.java | 24 +- .../RMStateVersionIncompatibleException.java | 2 +- .../recovery/ZKRMStateStore.java | 5 +- .../scheduler/MutableConfScheduler.java | 22 +- .../scheduler/MutableConfigurationProvider.java | 36 ++- .../scheduler/capacity/CapacityScheduler.java | 22 +- .../conf/InMemoryConfigurationStore.java | 71 +++-- .../conf/LeveldbConfigurationStore.java | 168 +++++----- .../conf/MutableCSConfigurationProvider.java | 148 +++++---- .../capacity/conf/YarnConfigurationStore.java | 132 ++++---- .../capacity/conf/ZKConfigurationStore.java | 235 ++++++++++++++ .../resourcemanager/webapp/RMWebServices.java | 26 +- .../conf/ConfigurationStoreBaseTest.java | 90 ++++++ .../conf/TestInMemoryConfigurationStore.java | 30 ++ .../TestMutableCSConfigurationProvider.java | 18 +- .../conf/TestYarnConfigurationStore.java | 71 ----- .../capacity/conf/TestZKConfigurationStore.java | 312 +++++++++++++++++++ 20 files changed, 1037 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ea8652d..8809a7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -678,6 +678,7 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "scheduler.configuration.store.class"; public static final String MEMORY_CONFIGURATION_STORE = "memory"; public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; + public static final String ZK_CONFIGURATION_STORE = "zk"; public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX @@ -689,9 +690,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L; - public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS = - YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs"; - public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final String RM_SCHEDCONF_MAX_LOGS = + YARN_PREFIX + "scheduler.configuration.store.max-logs"; + public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000; + + /** Parent znode path under which ZKConfigurationStore will create znodes. */ + public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX + + "scheduler.configuration.zk-store.parent-path"; + public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH = + "/confstore"; public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5afec1b..4516edf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3412,11 +3412,20 @@ - The max number of configuration change log entries kept in LevelDB config + The max number of configuration change log entries kept in config store, when yarn.scheduler.configuration.store.class is configured to be - "leveldb". Default is 1000. + "leveldb" or "zk". Default is 1000 for either. - yarn.scheduler.configuration.leveldb-store.max-logs + yarn.scheduler.configuration.store.max-logs 1000 + + + + ZK root node path for configuration store when using zookeeper-based + configuration store. + + yarn.scheduler.configuration.zk-store.parent-path + /confstore + http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index fd9e849..6c0a854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -387,9 +387,7 @@ public class AdminService extends CompositeService implements RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - ResourceScheduler scheduler = rm.getRMContext().getScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isSchedulerMutable()) { throw new IOException("Scheduler configuration is mutable. " + operation + " is not allowed in this scenario."); } @@ -413,6 +411,12 @@ public class AdminService extends CompositeService implements } } + private boolean isSchedulerMutable() { + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + return (scheduler instanceof MutableConfScheduler + && ((MutableConfScheduler) scheduler).isConfigurationMutable()); + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -721,6 +725,14 @@ public class AdminService extends CompositeService implements void refreshAll() throws ServiceFailedException { try { checkAcls("refreshAll"); + if (isSchedulerMutable()) { + try { + ((MutableConfScheduler) rm.getRMContext().getScheduler()) + .getMutableConfProvider().reloadConfigurationFromStore(); + } catch (Exception e) { + throw new IOException("Failed to refresh configuration:", e); + } + } refreshQueues(); refreshNodes(); refreshSuperUserGroupsConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index fd95ee5..d9312f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -351,7 +351,7 @@ public class ResourceManager extends CompositeService implements Recoverable { conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { - this.zkManager = createAndStartZKManager(conf); + this.zkManager = getAndStartZKManager(conf); elector = new CuratorBasedElectorService(this); } else { elector = new ActiveStandbyElectorBasedElectorService(this); @@ -360,13 +360,16 @@ public class ResourceManager extends CompositeService implements Recoverable { } /** - * Create and ZooKeeper Curator manager. + * Get ZooKeeper Curator manager, creating and starting if not exists. * @param config Configuration for the ZooKeeper curator. - * @return New ZooKeeper Curator manager. + * @return ZooKeeper Curator manager. * @throws IOException If it cannot create the manager. */ - public ZKCuratorManager createAndStartZKManager(Configuration config) - throws IOException { + public synchronized ZKCuratorManager getAndStartZKManager(Configuration + config) throws IOException { + if (this.zkManager != null) { + return zkManager; + } ZKCuratorManager manager = new ZKCuratorManager(config); // Get authentication @@ -386,15 +389,8 @@ public class ResourceManager extends CompositeService implements Recoverable { } manager.start(authInfos); - return manager; - } - - /** - * Get the ZooKeeper Curator manager. - * @return ZooKeeper Curator manager. - */ - public ZKCuratorManager getZKManager() { - return this.zkManager; + this.zkManager = manager; + return zkManager; } public CuratorFramework getCurator() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java index 135868f..d5fce36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; /** * This exception is thrown by ResourceManager if it's loading an incompatible - * version of state from state store on recovery. + * version of storage on recovery. */ public class RMStateVersionIncompatibleException extends YarnException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ac67dcd..5bff77f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -327,10 +327,7 @@ public class ZKRMStateStore extends RMStateStore { amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); - zkManager = resourceManager.getZKManager(); - if (zkManager == null) { - zkManager = resourceManager.createAndStartZKManager(conf); - } + zkManager = resourceManager.getAndStartZKManager(conf); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java index 313bf6a..6f677fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -18,11 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; - -import java.io.IOException; /** * Interface for a scheduler that supports changing configuration at runtime. @@ -31,16 +26,6 @@ import java.io.IOException; public interface MutableConfScheduler extends ResourceScheduler { /** - * Update the scheduler's configuration. - * @param user Caller of this update - * @param confUpdate configuration update - * @throws IOException if scheduler could not be reinitialized - * @throws YarnException if reservation system could not be reinitialized - */ - void updateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException; - - /** * Get the scheduler configuration. * @return the scheduler configuration */ @@ -58,4 +43,11 @@ public interface MutableConfScheduler extends ResourceScheduler { * @return whether scheduler configuration is mutable or not. */ boolean isConfigurationMutable(); + + /** + * Get scheduler's configuration provider, so other classes can directly + * call mutation APIs on configuration provider. + * @return scheduler's configuration provider + */ + MutableConfigurationProvider getMutableConfProvider(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 9baf1ad..f8e8814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -19,30 +19,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; -import java.io.IOException; - /** * Interface for allowing changing scheduler configurations. */ public interface MutableConfigurationProvider { /** - * Apply transactions which were not committed. - * @throws IOException if recovery fails + * Get the acl mutation policy for this configuration provider. + * @return The acl mutation policy. + */ + ConfigurationMutationACLPolicy getAclMutationPolicy(); + + /** + * Called when a new ResourceManager is starting/becomes active. Ensures + * configuration is up-to-date. + * @throws Exception if configuration could not be refreshed from store */ - void recoverConf() throws IOException; + void reloadConfigurationFromStore() throws Exception; /** - * Update the scheduler configuration with the provided key value pairs. - * @param user User issuing the request - * @param confUpdate Key-value pairs for configurations to be updated. - * @throws IOException if scheduler could not be reinitialized - * @throws YarnException if reservation system could not be reinitialized + * Log user's requested configuration mutation, and applies it in-memory. + * @param user User who requested the change + * @param confUpdate User's requested configuration change + * @throws Exception if logging the mutation fails */ - void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo - confUpdate) throws IOException, YarnException; + void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo + confUpdate) throws Exception; + /** + * Confirm last logged mutation. + * @param isValid if the last logged mutation is applied to scheduler + * properly. + * @throws Exception if confirming mutation fails + */ + void confirmPendingMutation(boolean isValid) throws Exception; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 51ee6a7..16b27c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -141,7 +141,6 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -393,9 +392,6 @@ public class CapacityScheduler extends @Override public void serviceStart() throws Exception { startSchedulerThreads(); - if (this.csConfProvider instanceof MutableConfigurationProvider) { - ((MutableConfigurationProvider) csConfProvider).recoverConf(); - } super.serviceStart(); } @@ -2619,19 +2615,15 @@ public class CapacityScheduler extends } @Override - public void updateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException { - if (isConfigurationMutable()) { - ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( - user, confUpdate); - } else { - throw new UnsupportedOperationException("Configured CS configuration " + - "provider does not support updating configuration."); - } + public boolean isConfigurationMutable() { + return csConfProvider instanceof MutableConfigurationProvider; } @Override - public boolean isConfigurationMutable() { - return csConfProvider instanceof MutableConfigurationProvider; + public MutableConfigurationProvider getMutableConfProvider() { + if (isConfigurationMutable()) { + return (MutableConfigurationProvider) csConfProvider; + } + return null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index c63734d..d69c236 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -19,8 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,48 +29,35 @@ import java.util.Map; * A default implementation of {@link YarnConfigurationStore}. Doesn't offer * persistent configuration storage, just stores the configuration in memory. */ -public class InMemoryConfigurationStore implements YarnConfigurationStore { +public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; - private LinkedList pendingMutations; - private long pendingId; + private LogMutation pendingMutation; @Override - public void initialize(Configuration conf, Configuration schedConf) { + public void initialize(Configuration conf, Configuration schedConf, + RMContext rmContext) { this.schedConf = schedConf; - this.pendingMutations = new LinkedList<>(); - this.pendingId = 0; } @Override - public synchronized long logMutation(LogMutation logMutation) { - logMutation.setId(++pendingId); - pendingMutations.add(logMutation); - return pendingId; + public void logMutation(LogMutation logMutation) { + pendingMutation = logMutation; } @Override - public synchronized boolean confirmMutation(long id, boolean isValid) { - LogMutation mutation = pendingMutations.poll(); - // If confirmMutation is called out of order, discard mutations until id - // is reached. - while (mutation != null) { - if (mutation.getId() == id) { - if (isValid) { - Map mutations = mutation.getUpdates(); - for (Map.Entry kv : mutations.entrySet()) { - if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); - } else { - schedConf.set(kv.getKey(), kv.getValue()); - } - } + public void confirmMutation(boolean isValid) { + if (isValid) { + for (Map.Entry kv : pendingMutation.getUpdates() + .entrySet()) { + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); } - return true; } - mutation = pendingMutations.poll(); } - return false; + pendingMutation = null; } @Override @@ -78,13 +66,30 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore { } @Override - public synchronized List getPendingMutations() { - return new LinkedList<>(pendingMutations); + public List getConfirmedConfHistory(long fromId) { + // Unimplemented. + return null; } @Override - public List getConfirmedConfHistory(long fromId) { - // Unimplemented. + public Version getConfStoreVersion() throws Exception { + // Does nothing. return null; } + + @Override + public void storeVersion() throws Exception { + // Does nothing. + } + + @Override + public Version getCurrentVersion() { + // Does nothing. + return null; + } + + @Override + public void checkVersion() { + // Does nothing. (Version is always compatible since it's in memory) + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 1280fab..1b0eb9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -26,6 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -55,58 +59,32 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes; /** * A LevelDB implementation of {@link YarnConfigurationStore}. */ -public class LeveldbConfigurationStore implements YarnConfigurationStore { +public class LeveldbConfigurationStore extends YarnConfigurationStore { public static final Log LOG = LogFactory.getLog(LeveldbConfigurationStore.class); private static final String DB_NAME = "yarn-conf-store"; - private static final String LOG_PREFIX = "log."; - private static final String LOG_COMMITTED_TXN = "committedTxn"; + private static final String LOG_KEY = "log"; + private static final String VERSION_KEY = "version"; private DB db; - // Txnid for the last transaction logged to the store. - private long txnId = 0; - private long minTxn = 0; private long maxLogs; private Configuration conf; - private LinkedList pendingMutations = new LinkedList<>(); + private LogMutation pendingMutation; + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); private Timer compactionTimer; private long compactionIntervalMsec; @Override - public void initialize(Configuration config, Configuration schedConf) - throws IOException { + public void initialize(Configuration config, Configuration schedConf, + RMContext rmContext) throws IOException { this.conf = config; try { this.db = initDatabase(schedConf); - this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)), - StandardCharsets.UTF_8)); - DBIterator itr = db.iterator(); - itr.seek(bytes(LOG_PREFIX + txnId)); - // Seek to first uncommitted log - itr.next(); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - if (!new String(entry.getKey(), StandardCharsets.UTF_8) - .startsWith(LOG_PREFIX)) { - break; - } - pendingMutations.add(deserLogMutation(entry.getValue())); - txnId++; - } - // Get the earliest txnId stored in logs - itr.seekToFirst(); - if (itr.hasNext()) { - Map.Entry entry = itr.next(); - byte[] key = entry.getKey(); - String logId = new String(key, StandardCharsets.UTF_8); - if (logId.startsWith(LOG_PREFIX)) { - minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1)); - } - } this.maxLogs = config.getLong( - YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS, + YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); this.compactionIntervalMsec = config.getLong( YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, @@ -127,33 +105,23 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { public int compare(byte[] key1, byte[] key2) { String key1Str = new String(key1, StandardCharsets.UTF_8); String key2Str = new String(key2, StandardCharsets.UTF_8); - int key1Txn = Integer.MAX_VALUE; - int key2Txn = Integer.MAX_VALUE; - if (key1Str.startsWith(LOG_PREFIX)) { - key1Txn = Integer.parseInt(key1Str.substring( - key1Str.indexOf('.') + 1)); - } - if (key2Str.startsWith(LOG_PREFIX)) { - key2Txn = Integer.parseInt(key2Str.substring( - key2Str.indexOf('.') + 1)); - } - // TODO txnId could overflow, in theory - if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) { - if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) { - return 0; - } else if (key1Str.equals(LOG_COMMITTED_TXN)) { - return -1; - } else if (key2Str.equals(LOG_COMMITTED_TXN)) { - return 1; - } - return key1Str.compareTo(key2Str); + if (key1Str.equals(key2Str)) { + return 0; + } else if (key1Str.equals(VERSION_KEY)) { + return -1; + } else if (key2Str.equals(VERSION_KEY)) { + return 1; + } else if (key1Str.equals(LOG_KEY)) { + return -1; + } else if (key2Str.equals(LOG_KEY)) { + return 1; } - return key1Txn - key2Txn; + return key1Str.compareTo(key2Str); } @Override public String name() { - return "logComparator"; + return "keyComparator"; } public byte[] findShortestSeparator(byte[] start, byte[] limit) { @@ -164,6 +132,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { return key; } }); + LOG.info("Using conf database at " + storeRoot); File dbfile = new File(storeRoot.toString()); try { @@ -179,7 +148,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { for (Map.Entry kv : config) { initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); } - initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0")); db.write(initBatch); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); @@ -208,28 +176,22 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } @Override - public synchronized long logMutation(LogMutation logMutation) - throws IOException { - logMutation.setId(++txnId); - WriteBatch logBatch = db.createWriteBatch(); - logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation)); - if (txnId - minTxn >= maxLogs) { - logBatch.delete(bytes(LOG_PREFIX + minTxn)); - minTxn++; + public void logMutation(LogMutation logMutation) throws IOException { + LinkedList logs = deserLogMutations(db.get(bytes(LOG_KEY))); + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.removeFirst(); } - db.write(logBatch); - pendingMutations.add(logMutation); - return txnId; + db.put(bytes(LOG_KEY), serLogMutations(logs)); + pendingMutation = logMutation; } @Override - public synchronized boolean confirmMutation(long id, boolean isValid) - throws IOException { + public void confirmMutation(boolean isValid) throws IOException { WriteBatch updateBatch = db.createWriteBatch(); if (isValid) { - LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id))); for (Map.Entry changes : - mutation.getUpdates().entrySet()) { + pendingMutation.getUpdates().entrySet()) { if (changes.getValue() == null || changes.getValue().isEmpty()) { updateBatch.delete(bytes(changes.getKey())); } else { @@ -237,28 +199,24 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } } } - updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id))); db.write(updateBatch); - // Assumes logMutation and confirmMutation are done in the same - // synchronized method. For example, - // {@link MutableCSConfigurationProvider#mutateConfiguration( - // UserGroupInformation user, SchedConfUpdateInfo confUpdate)} - pendingMutations.removeFirst(); - return true; + pendingMutation = null; } - private byte[] serLogMutation(LogMutation mutation) throws IOException { + private byte[] serLogMutations(LinkedList mutations) throws + IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ObjectOutput oos = new ObjectOutputStream(baos)) { - oos.writeObject(mutation); + oos.writeObject(mutations); oos.flush(); return baos.toByteArray(); } } - private LogMutation deserLogMutation(byte[] mutation) throws IOException { + private LinkedList deserLogMutations(byte[] mutations) throws + IOException { try (ObjectInput input = new ObjectInputStream( - new ByteArrayInputStream(mutation))) { - return (LogMutation) input.readObject(); + new ByteArrayInputStream(mutations))) { + return (LinkedList) input.readObject(); } catch (ClassNotFoundException e) { throw new IOException(e); } @@ -267,7 +225,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { @Override public synchronized Configuration retrieve() { DBIterator itr = db.iterator(); - itr.seek(bytes(LOG_COMMITTED_TXN)); + itr.seek(bytes(LOG_KEY)); Configuration config = new Configuration(false); itr.next(); while (itr.hasNext()) { @@ -279,11 +237,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } @Override - public List getPendingMutations() { - return new LinkedList<>(pendingMutations); - } - - @Override public List getConfirmedConfHistory(long fromId) { return null; // unimplemented } @@ -299,6 +252,39 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } } + // TODO: following is taken from LeveldbRMStateStore + @Override + public Version getConfStoreVersion() throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(VERSION_KEY)); + if (data != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @Override + public void storeVersion() throws Exception { + String key = VERSION_KEY; + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + .toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + private class CompactionTimerTask extends TimerTask { @Override public void run() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index d03b2e2..70d1840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -18,20 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; @@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, LogFactory.getLog(MutableCSConfigurationProvider.class); private Configuration schedConf; + private Configuration oldConf; private YarnConfigurationStore confStore; private ConfigurationMutationACLPolicy aclMutationPolicy; private RMContext rmContext; @@ -76,6 +74,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: this.confStore = new LeveldbConfigurationStore(); break; + case YarnConfiguration.ZK_CONFIGURATION_STORE: + this.confStore = new ZKConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; @@ -89,7 +90,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, for (Map.Entry kv : initialSchedConf) { schedConf.set(kv.getKey(), kv.getValue()); } - confStore.initialize(config, schedConf); + try { + confStore.initialize(config, schedConf, rmContext); + } catch (Exception e) { + throw new IOException(e); + } // After initializing confStore, the store may already have an existing // configuration. Use this one. schedConf = confStore.retrieve(); @@ -98,6 +103,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, aclMutationPolicy.init(config, rmContext); } + @VisibleForTesting + public YarnConfigurationStore getConfStore() { + return confStore; + } + @Override public CapacitySchedulerConfiguration loadConfiguration(Configuration configuration) throws IOException { @@ -107,16 +117,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } @Override - public synchronized void mutateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException { - if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { - throw new AccessControlException("User is not admin of all modified" + - " queues."); - } - Configuration oldConf = new Configuration(schedConf); + public ConfigurationMutationACLPolicy getAclMutationPolicy() { + return aclMutationPolicy; + } + + @Override + public void logAndApplyMutation(UserGroupInformation user, + SchedConfUpdateInfo confUpdate) throws Exception { + oldConf = new Configuration(schedConf); Map kvUpdate = constructKeyValueConfUpdate(confUpdate); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); - long id = confStore.logMutation(log); + confStore.logMutation(log); for (Map.Entry kv : kvUpdate.entrySet()) { if (kv.getValue() == null) { schedConf.unset(kv.getKey()); @@ -124,47 +135,33 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, schedConf.set(kv.getKey(), kv.getValue()); } } - try { - rmContext.getRMAdminService().refreshQueues(); - } catch (IOException | YarnException e) { + } + + @Override + public void confirmPendingMutation(boolean isValid) throws Exception { + confStore.confirmMutation(isValid); + if (!isValid) { schedConf = oldConf; - confStore.confirmMutation(id, false); - throw e; } - confStore.confirmMutation(id, true); } @Override - public void recoverConf() throws IOException { - List uncommittedLogs = confStore.getPendingMutations(); - Configuration oldConf = new Configuration(schedConf); - for (LogMutation mutation : uncommittedLogs) { - for (Map.Entry kv : mutation.getUpdates().entrySet()) { - if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); - } else { - schedConf.set(kv.getKey(), kv.getValue()); - } - } - try { - rmContext.getScheduler().reinitialize(schedConf, rmContext); - } catch (IOException e) { - schedConf = oldConf; - confStore.confirmMutation(mutation.getId(), false); - LOG.info("Configuration mutation " + mutation.getId() - + " was rejected", e); - continue; - } - confStore.confirmMutation(mutation.getId(), true); - LOG.info("Configuration mutation " + mutation.getId()+ " was accepted"); - } + public void reloadConfigurationFromStore() throws Exception { + schedConf = confStore.retrieve(); + } + + private List getSiblingQueues(String queuePath, Configuration conf) { + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String childQueuesKey = CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + return new ArrayList<>(conf.getStringCollection(childQueuesKey)); } private Map constructKeyValueConfUpdate( SchedConfUpdateInfo mutationInfo) throws IOException { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(cs.getConfiguration(), false); + new CapacitySchedulerConfiguration(schedConf, false); Map confUpdate = new HashMap<>(); for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { removeQueue(queueToRemove, proposedConf, confUpdate); @@ -188,40 +185,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, if (queueToRemove == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); String queueName = queueToRemove.substring( queueToRemove.lastIndexOf('.') + 1); - CSQueue queue = cs.getQueue(queueName); - if (queue == null || - !queue.getQueuePath().equals(queueToRemove)) { - throw new IOException("Queue " + queueToRemove + " not found"); - } else if (queueToRemove.lastIndexOf('.') == -1) { + if (queueToRemove.lastIndexOf('.') == -1) { throw new IOException("Can't remove queue " + queueToRemove); - } - String parentQueuePath = queueToRemove.substring(0, queueToRemove - .lastIndexOf('.')); - String[] siblingQueues = proposedConf.getQueues(parentQueuePath); - List newSiblingQueues = new ArrayList<>(); - for (String siblingQueue : siblingQueues) { - if (!siblingQueue.equals(queueName)) { - newSiblingQueues.add(siblingQueue); - } - } - proposedConf.setQueues(parentQueuePath, newSiblingQueues - .toArray(new String[0])); - String queuesConfig = CapacitySchedulerConfiguration.PREFIX - + parentQueuePath + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.QUEUES; - if (newSiblingQueues.size() == 0) { - confUpdate.put(queuesConfig, null); } else { - confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); - } - for (Map.Entry confRemove : proposedConf.getValByRegex( - ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") - .entrySet()) { - proposedConf.unset(confRemove.getKey()); - confUpdate.put(confRemove.getKey(), null); + List siblingQueues = getSiblingQueues(queueToRemove, + proposedConf); + if (!siblingQueues.contains(queueName)) { + throw new IOException("Queue " + queueToRemove + " not found"); + } + siblingQueues.remove(queueName); + String parentQueuePath = queueToRemove.substring(0, queueToRemove + .lastIndexOf('.')); + proposedConf.setQueues(parentQueuePath, siblingQueues.toArray( + new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (siblingQueues.size() == 0) { + confUpdate.put(queuesConfig, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } } } } @@ -232,13 +224,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, if (addInfo == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); String queuePath = addInfo.getQueue(); String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); - if (cs.getQueue(queueName) != null) { - throw new IOException("Can't add existing queue " + queuePath); - } else if (queuePath.lastIndexOf('.') == -1) { + if (queuePath.lastIndexOf('.') == -1) { throw new IOException("Can't add invalid queue " + queuePath); + } else if (getSiblingQueues(queuePath, proposedConf).contains( + queueName)) { + throw new IOException("Can't add existing queue " + queuePath); } String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); String[] siblings = proposedConf.getQueues(parentQueue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 065c877..1356535 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -18,7 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import java.io.IOException; @@ -39,36 +44,26 @@ import java.util.Map; * {@code getPendingMutations}, and replay/confirm them via * {@code confirmMutation} as in the normal case. */ -public interface YarnConfigurationStore { +public abstract class YarnConfigurationStore { + public static final Log LOG = + LogFactory.getLog(YarnConfigurationStore.class); /** * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - class LogMutation implements Serializable { + static class LogMutation implements Serializable { private Map updates; private String user; - private long id; /** - * Create log mutation prior to logging. + * Create log mutation. * @param updates key-value configuration updates * @param user user who requested configuration change */ - public LogMutation(Map updates, String user) { - this(updates, user, 0); - } - - /** - * Create log mutation for recovery. - * @param updates key-value configuration updates - * @param user user who requested configuration change - * @param id transaction id of configuration change - */ - LogMutation(Map updates, String user, long id) { + LogMutation(Map updates, String user) { this.updates = updates; this.user = user; - this.id = id; } /** @@ -86,75 +81,92 @@ public interface YarnConfigurationStore { public String getUser() { return user; } - - /** - * Get transaction id of this configuration change. - * @return transaction id - */ - public long getId() { - return id; - } - - /** - * Set transaction id of this configuration change. - * @param id transaction id - */ - public void setId(long id) { - this.id = id; - } } /** - * Initialize the configuration store. + * Initialize the configuration store, with schedConf as the initial + * scheduler configuration. If a persisted store already exists, use the + * scheduler configuration stored there, and ignore schedConf. * @param conf configuration to initialize store with - * @param schedConf Initial key-value configuration to persist + * @param schedConf Initial key-value scheduler configuration to persist. + * @param rmContext RMContext for this configuration store * @throws IOException if initialization fails */ - void initialize(Configuration conf, Configuration schedConf) - throws IOException; + public abstract void initialize(Configuration conf, Configuration schedConf, + RMContext rmContext) throws Exception; /** - * Logs the configuration change to backing store. Generates an id associated - * with this mutation, sets it in {@code logMutation}, and returns it. + * Logs the configuration change to backing store. * @param logMutation configuration change to be persisted in write ahead log - * @return id which configuration store associates with this mutation * @throws IOException if logging fails */ - long logMutation(LogMutation logMutation) throws IOException; + public abstract void logMutation(LogMutation logMutation) throws Exception; /** * Should be called after {@code logMutation}. Gets the pending mutation - * associated with {@code id} and marks the mutation as persisted (no longer - * pending). If isValid is true, merge the mutation with the persisted + * last logged by {@code logMutation} and marks the mutation as persisted (no + * longer pending). If isValid is true, merge the mutation with the persisted * configuration. - * - * If {@code confirmMutation} is called with ids in a different order than - * was returned by {@code logMutation}, the result is implementation - * dependent. - * @param id id of mutation to be confirmed - * @param isValid if true, update persisted configuration with mutation - * associated with {@code id}. - * @return true on success - * @throws IOException if mutation confirmation fails + * @param isValid if true, update persisted configuration with pending + * mutation. + * @throws Exception if mutation confirmation fails */ - boolean confirmMutation(long id, boolean isValid) throws IOException; + public abstract void confirmMutation(boolean isValid) throws Exception; /** * Retrieve the persisted configuration. * @return configuration as key-value */ - Configuration retrieve(); - - /** - * Get the list of pending mutations, in the order they were logged. - * @return list of mutations - */ - List getPendingMutations(); + public abstract Configuration retrieve(); /** * Get a list of confirmed configuration mutations starting from a given id. * @param fromId id from which to start getting mutations, inclusive * @return list of configuration mutations */ - List getConfirmedConfHistory(long fromId); + public abstract List getConfirmedConfHistory(long fromId); + + /** + * Get schema version of persisted conf store, for detecting compatibility + * issues when changing conf store schema. + * @return Schema version currently used by the persisted configuration store. + * @throws Exception On version fetch failure + */ + protected abstract Version getConfStoreVersion() throws Exception; + + /** + * Persist the hard-coded schema version to the conf store. + * @throws Exception On storage failure + */ + protected abstract void storeVersion() throws Exception; + + /** + * Get the hard-coded schema version, for comparison against the schema + * version currently persisted. + * @return Current hard-coded schema version + */ + protected abstract Version getCurrentVersion(); + + public void checkVersion() throws Exception { + // TODO this was taken from RMStateStore. Should probably refactor + Version loadedVersion = getConfStoreVersion(); + LOG.info("Loaded configuration store version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = getCurrentVersion(); + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing configuration store version info " + + getCurrentVersion()); + storeVersion(); + } else { + throw new RMStateVersionIncompatibleException( + "Expecting configuration store version " + getCurrentVersion() + + ", but loading version " + loadedVersion); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java new file mode 100644 index 0000000..a0bba8c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A Zookeeper-based implementation of {@link YarnConfigurationStore}. + */ +public class ZKConfigurationStore extends YarnConfigurationStore { + + public static final Log LOG = + LogFactory.getLog(ZKConfigurationStore.class); + + private long maxLogs; + + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); + private Configuration conf; + private LogMutation pendingMutation; + + private String znodeParentPath; + + private static final String ZK_VERSION_PATH = "VERSION"; + private static final String LOGS_PATH = "LOGS"; + private static final String CONF_STORE_PATH = "CONF_STORE"; + private static final String FENCING_PATH = "FENCING"; + + private String zkVersionPath; + private String logsPath; + private String confStorePath; + private String fencingNodePath; + + @VisibleForTesting + protected ZKCuratorManager zkManager; + private List zkAcl; + + @Override + public void initialize(Configuration config, Configuration schedConf, + RMContext rmContext) throws Exception { + this.conf = config; + this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); + this.znodeParentPath = + conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf); + this.zkAcl = ZKCuratorManager.getZKAcls(conf); + + this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH); + this.logsPath = getNodePath(znodeParentPath, LOGS_PATH); + this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH); + this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH); + + zkManager.createRootDirRecursively(znodeParentPath); + zkManager.delete(fencingNodePath); + + if (!zkManager.exists(logsPath)) { + zkManager.create(logsPath); + zkManager.setData(logsPath, + serializeObject(new LinkedList()), -1); + } + + if (!zkManager.exists(confStorePath)) { + zkManager.create(confStorePath); + HashMap mapSchedConf = new HashMap<>(); + for (Map.Entry entry : schedConf) { + mapSchedConf.put(entry.getKey(), entry.getValue()); + } + zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); + } + } + + @VisibleForTesting + protected LinkedList getLogs() throws Exception { + return (LinkedList) + deserializeObject(zkManager.getData(logsPath)); + } + + // TODO: following version-related code is taken from ZKRMStateStore + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public Version getConfStoreVersion() throws Exception { + if (zkManager.exists(zkVersionPath)) { + byte[] data = zkManager.getData(zkVersionPath); + return new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + + return null; + } + + @Override + public synchronized void storeVersion() throws Exception { + byte[] data = + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + + if (zkManager.exists(zkVersionPath)) { + zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath); + } else { + zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); + } + } + + @Override + public void logMutation(LogMutation logMutation) throws Exception { + byte[] storedLogs = zkManager.getData(logsPath); + LinkedList logs = new LinkedList<>(); + if (storedLogs != null) { + logs = (LinkedList) deserializeObject(storedLogs); + } + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.remove(logs.removeFirst()); + } + zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, + fencingNodePath); + pendingMutation = logMutation; + } + + @Override + public void confirmMutation(boolean isValid) + throws Exception { + if (isValid) { + Configuration storedConfigs = retrieve(); + Map mapConf = new HashMap<>(); + for (Map.Entry storedConf : storedConfigs) { + mapConf.put(storedConf.getKey(), storedConf.getValue()); + } + for (Map.Entry confChange : + pendingMutation.getUpdates().entrySet()) { + if (confChange.getValue() == null || confChange.getValue().isEmpty()) { + mapConf.remove(confChange.getKey()); + } else { + mapConf.put(confChange.getKey(), confChange.getValue()); + } + } + zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, + zkAcl, fencingNodePath); + } + pendingMutation = null; + } + + @Override + public synchronized Configuration retrieve() { + byte[] serializedSchedConf; + try { + serializedSchedConf = zkManager.getData(confStorePath); + } catch (Exception e) { + LOG.error("Failed to retrieve configuration from zookeeper store", e); + return null; + } + try { + Map map = + (HashMap) deserializeObject(serializedSchedConf); + Configuration c = new Configuration(); + for (Map.Entry e : map.entrySet()) { + c.set(e.getKey(), e.getValue()); + } + return c; + } catch (Exception e) { + LOG.error("Exception while deserializing scheduler configuration " + + "from store", e); + } + return null; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + return null; // unimplemented + } + + private static String getNodePath(String root, String nodeName) { + return ZKCuratorManager.getNodePath(root, nodeName); + } + + private static byte[] serializeObject(Object o) throws Exception { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos);) { + oos.writeObject(o); + oos.flush(); + baos.flush(); + return baos.toByteArray(); + } + } + + private static Object deserializeObject(byte[] bytes) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais);) { + return ois.readObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 1da4e65..d264c10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -2464,7 +2465,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) - public Response updateSchedulerConfiguration(SchedConfUpdateInfo + public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo, @Context HttpServletRequest hsr) throws AuthorizationException, InterruptedException { init(); @@ -2479,17 +2480,32 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler) { + if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) + scheduler).isConfigurationMutable()) { try { callerUGI.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws IOException, YarnException { - ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI, - mutationInfo); + public Void run() throws Exception { + MutableConfigurationProvider provider = ((MutableConfScheduler) + scheduler).getMutableConfProvider(); + if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI, + mutationInfo)) { + throw new org.apache.hadoop.security.AccessControlException("User" + + " is not admin of all modified queues."); + } + provider.logAndApplyMutation(callerUGI, mutationInfo); + try { + rm.getRMContext().getRMAdminService().refreshQueues(); + } catch (IOException | YarnException e) { + provider.confirmPendingMutation(false); + throw e; + } + provider.confirmPendingMutation(true); return null; } }); } catch (IOException e) { + LOG.error("Exception thrown when modifying configuration.", e); return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java new file mode 100644 index 0000000..bbe9570 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Base class for {@link YarnConfigurationStore} implementations. + */ +public abstract class ConfigurationStoreBaseTest { + + protected YarnConfigurationStore confStore = createConfStore(); + + protected abstract YarnConfigurationStore createConfStore(); + + protected Configuration conf; + protected Configuration schedConf; + protected RMContext rmContext; + + protected static final String TEST_USER = "testUser"; + + @Before + public void setUp() throws Exception { + this.conf = new Configuration(); + this.schedConf = new Configuration(false); + } + + @Test + public void testConfigurationUpdate() throws Exception { + schedConf.set("key1", "val1"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val1", confStore.retrieve().get("key1")); + + Map update1 = new HashMap<>(); + update1.put("keyUpdate1", "valUpdate1"); + YarnConfigurationStore.LogMutation mutation1 = + new YarnConfigurationStore.LogMutation(update1, TEST_USER); + confStore.logMutation(mutation1); + confStore.confirmMutation(true); + assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); + + Map update2 = new HashMap<>(); + update2.put("keyUpdate2", "valUpdate2"); + YarnConfigurationStore.LogMutation mutation2 = + new YarnConfigurationStore.LogMutation(update2, TEST_USER); + confStore.logMutation(mutation2); + confStore.confirmMutation(false); + assertNull("Configuration should not be updated", + confStore.retrieve().get("keyUpdate2")); + } + + @Test + public void testNullConfigurationUpdate() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", null); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + confStore.confirmMutation(true); + assertNull(confStore.retrieve().get("key")); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java new file mode 100644 index 0000000..c40d16a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +/** + * Tests {@link InMemoryConfigurationStore}. + */ +public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest { + + @Override + protected YarnConfigurationStore createConfStore() { + return new InMemoryConfigurationStore(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 635a184..9b080cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -30,14 +29,11 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -82,25 +78,21 @@ public class TestMutableCSConfigurationProvider { } @Test - public void testInMemoryBackedProvider() throws IOException, YarnException { + public void testInMemoryBackedProvider() throws Exception { Configuration conf = new Configuration(); confProvider.init(conf); assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - doNothing().when(adminService).refreshQueues(); - confProvider.mutateConfiguration(TEST_USER, goodUpdate); + confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - doThrow(new IOException()).when(adminService).refreshQueues(); - try { - confProvider.mutateConfiguration(TEST_USER, badUpdate); - } catch (IOException e) { - // Expected exception. - } + confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fb0a58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java deleted file mode 100644 index 631ce65..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class TestYarnConfigurationStore { - - private YarnConfigurationStore confStore; - private Configuration schedConf; - - private static final String testUser = "testUser"; - - @Before - public void setUp() { - schedConf = new Configuration(false); - schedConf.set("key1", "val1"); - } - - @Test - public void testInMemoryConfigurationStore() throws IOException { - confStore = new InMemoryConfigurationStore(); - confStore.initialize(new Configuration(), schedConf); - assertEquals("val1", confStore.retrieve().get("key1")); - - Map update1 = new HashMap<>(); - update1.put("keyUpdate1", "valUpdate1"); - LogMutation mutation1 = new LogMutation(update1, testUser); - long id = confStore.logMutation(mutation1); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, true); - assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); - assertEquals(0, confStore.getPendingMutations().size()); - - Map update2 = new HashMap<>(); - update2.put("keyUpdate2", "valUpdate2"); - LogMutation mutation2 = new LogMutation(update2, testUser); - id = confStore.logMutation(mutation2); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, false); - assertNull("Configuration should not be updated", - confStore.retrieve().get("keyUpdate2")); - assertEquals(0, confStore.getPendingMutations().size()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org