hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sun...@apache.org
Subject [07/50] [abbrv] hadoop git commit: YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)
Date Thu, 12 Oct 2017 09:37:34 GMT
YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)

Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff39c0de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff39c0de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff39c0de

Branch: refs/heads/YARN-5881
Commit: ff39c0de206a4fce1f0e8a416357a7a8261f8634
Parents: 4d8abd8
Author: Wangda Tan <wangda@apache.org>
Authored: Mon Sep 18 09:53:42 2017 -0700
Committer: Jonathan Hung <jhung@linkedin.com>
Committed: Mon Oct 9 11:11:23 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 +-
 .../src/main/resources/yarn-default.xml         |  15 +-
 .../server/resourcemanager/AdminService.java    |  18 +-
 .../server/resourcemanager/ResourceManager.java |  24 +-
 .../RMStateVersionIncompatibleException.java    |   2 +-
 .../recovery/ZKRMStateStore.java                |   5 +-
 .../scheduler/MutableConfScheduler.java         |  22 +-
 .../scheduler/MutableConfigurationProvider.java |  36 ++-
 .../scheduler/capacity/CapacityScheduler.java   |  22 +-
 .../conf/InMemoryConfigurationStore.java        |  71 +++--
 .../conf/LeveldbConfigurationStore.java         | 168 +++++-----
 .../conf/MutableCSConfigurationProvider.java    | 148 +++++----
 .../capacity/conf/YarnConfigurationStore.java   | 132 ++++----
 .../capacity/conf/ZKConfigurationStore.java     | 235 ++++++++++++++
 .../resourcemanager/webapp/RMWebServices.java   |  26 +-
 .../conf/ConfigurationStoreBaseTest.java        |  90 ++++++
 .../conf/TestInMemoryConfigurationStore.java    |  30 ++
 .../TestMutableCSConfigurationProvider.java     |  18 +-
 .../conf/TestYarnConfigurationStore.java        |  71 -----
 .../capacity/conf/TestZKConfigurationStore.java | 312 +++++++++++++++++++
 20 files changed, 1037 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ea8652d..8809a7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -678,6 +678,7 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "scheduler.configuration.store.class";
   public static final String MEMORY_CONFIGURATION_STORE = "memory";
   public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+  public static final String ZK_CONFIGURATION_STORE = "zk";
   public static final String DEFAULT_CONFIGURATION_STORE =
       MEMORY_CONFIGURATION_STORE;
   public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
@@ -689,9 +690,16 @@ public class YarnConfiguration extends Configuration {
   public static final long
       DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
 
-  public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
-      YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
-  public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+  public static final String RM_SCHEDCONF_MAX_LOGS =
+      YARN_PREFIX + "scheduler.configuration.store.max-logs";
+  public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+  public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
+
+  /** Parent znode path under which ZKConfigurationStore will create znodes. */
+  public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX
+      + "scheduler.configuration.zk-store.parent-path";
+  public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH =
+      "/confstore";
 
   public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
       YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5afec1b..4516edf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3412,11 +3412,20 @@
 
   <property>
     <description>
-      The max number of configuration change log entries kept in LevelDB config
+      The max number of configuration change log entries kept in config
       store, when yarn.scheduler.configuration.store.class is configured to be
-      "leveldb". Default is 1000.
+      "leveldb" or "zk". Default is 1000 for either.
     </description>
-    <name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
+    <name>yarn.scheduler.configuration.store.max-logs</name>
     <value>1000</value>
   </property>
+
+  <property>
+    <description>
+      ZK root node path for configuration store when using zookeeper-based
+      configuration store.
+    </description>
+    <name>yarn.scheduler.configuration.zk-store.parent-path</name>
+    <value>/confstore</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index fd9e849..6c0a854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -387,9 +387,7 @@ public class AdminService extends CompositeService implements
     RefreshQueuesResponse response =
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
-      ResourceScheduler scheduler = rm.getRMContext().getScheduler();
-      if (scheduler instanceof MutableConfScheduler
-          && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+      if (isSchedulerMutable()) {
         throw new IOException("Scheduler configuration is mutable. " +
             operation + " is not allowed in this scenario.");
       }
@@ -413,6 +411,12 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  private boolean isSchedulerMutable() {
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    return (scheduler instanceof MutableConfScheduler
+        && ((MutableConfScheduler) scheduler).isConfigurationMutable());
+  }
+
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws YarnException, StandbyException {
@@ -721,6 +725,14 @@ public class AdminService extends CompositeService implements
   void refreshAll() throws ServiceFailedException {
     try {
       checkAcls("refreshAll");
+      if (isSchedulerMutable()) {
+        try {
+          ((MutableConfScheduler) rm.getRMContext().getScheduler())
+              .getMutableConfProvider().reloadConfigurationFromStore();
+        } catch (Exception e) {
+          throw new IOException("Failed to refresh configuration:", e);
+        }
+      }
       refreshQueues();
       refreshNodes();
       refreshSuperUserGroupsConfiguration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index fd95ee5..d9312f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -351,7 +351,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
         conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
             YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
     if (curatorEnabled) {
-      this.zkManager = createAndStartZKManager(conf);
+      this.zkManager = getAndStartZKManager(conf);
       elector = new CuratorBasedElectorService(this);
     } else {
       elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -360,13 +360,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   /**
-   * Create and ZooKeeper Curator manager.
+   * Get ZooKeeper Curator manager, creating and starting if not exists.
    * @param config Configuration for the ZooKeeper curator.
-   * @return New ZooKeeper Curator manager.
+   * @return ZooKeeper Curator manager.
    * @throws IOException If it cannot create the manager.
    */
-  public ZKCuratorManager createAndStartZKManager(Configuration config)
-      throws IOException {
+  public synchronized ZKCuratorManager getAndStartZKManager(Configuration
+      config) throws IOException {
+    if (this.zkManager != null) {
+      return zkManager;
+    }
     ZKCuratorManager manager = new ZKCuratorManager(config);
 
     // Get authentication
@@ -386,15 +389,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
 
     manager.start(authInfos);
-    return manager;
-  }
-
-  /**
-   * Get the ZooKeeper Curator manager.
-   * @return ZooKeeper Curator manager.
-   */
-  public ZKCuratorManager getZKManager() {
-    return this.zkManager;
+    this.zkManager = manager;
+    return zkManager;
   }
 
   public CuratorFramework getCurator() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
index 135868f..d5fce36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 
 /**
  * This exception is thrown by ResourceManager if it's loading an incompatible
- * version of state from state store on recovery.
+ * version of storage on recovery.
  */
 public class RMStateVersionIncompatibleException extends YarnException {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ac67dcd..5bff77f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -327,10 +327,7 @@ public class ZKRMStateStore extends RMStateStore {
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
-    zkManager = resourceManager.getZKManager();
-    if (zkManager == null) {
-      zkManager = resourceManager.createAndStartZKManager(conf);
-    }
+    zkManager = resourceManager.getAndStartZKManager(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
index 313bf6a..6f677fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
@@ -18,11 +18,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-
-import java.io.IOException;
 
 /**
  * Interface for a scheduler that supports changing configuration at runtime.
@@ -31,16 +26,6 @@ import java.io.IOException;
 public interface MutableConfScheduler extends ResourceScheduler {
 
   /**
-   * Update the scheduler's configuration.
-   * @param user Caller of this update
-   * @param confUpdate configuration update
-   * @throws IOException if scheduler could not be reinitialized
-   * @throws YarnException if reservation system could not be reinitialized
-   */
-  void updateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException;
-
-  /**
    * Get the scheduler configuration.
    * @return the scheduler configuration
    */
@@ -58,4 +43,11 @@ public interface MutableConfScheduler extends ResourceScheduler {
    * @return whether scheduler configuration is mutable or not.
    */
   boolean isConfigurationMutable();
+
+  /**
+   * Get scheduler's configuration provider, so other classes can directly
+   * call mutation APIs on configuration provider.
+   * @return scheduler's configuration provider
+   */
+  MutableConfigurationProvider getMutableConfProvider();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 9baf1ad..f8e8814 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -19,30 +19,40 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
-import java.io.IOException;
-
 /**
  * Interface for allowing changing scheduler configurations.
  */
 public interface MutableConfigurationProvider {
 
   /**
-   * Apply transactions which were not committed.
-   * @throws IOException if recovery fails
+   * Get the acl mutation policy for this configuration provider.
+   * @return The acl mutation policy.
+   */
+  ConfigurationMutationACLPolicy getAclMutationPolicy();
+
+  /**
+   * Called when a new ResourceManager is starting/becomes active. Ensures
+   * configuration is up-to-date.
+   * @throws Exception if configuration could not be refreshed from store
    */
-  void recoverConf() throws IOException;
+  void reloadConfigurationFromStore() throws Exception;
 
   /**
-   * Update the scheduler configuration with the provided key value pairs.
-   * @param user User issuing the request
-   * @param confUpdate Key-value pairs for configurations to be updated.
-   * @throws IOException if scheduler could not be reinitialized
-   * @throws YarnException if reservation system could not be reinitialized
+   * Log user's requested configuration mutation, and applies it in-memory.
+   * @param user User who requested the change
+   * @param confUpdate User's requested configuration change
+   * @throws Exception if logging the mutation fails
    */
-  void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
-      confUpdate) throws IOException, YarnException;
+  void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
+      confUpdate) throws Exception;
 
+  /**
+   * Confirm last logged mutation.
+   * @param isValid if the last logged mutation is applied to scheduler
+   *                properly.
+   * @throws Exception if confirming mutation fails
+   */
+  void confirmPendingMutation(boolean isValid) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 51ee6a7..16b27c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -141,7 +141,6 @@ import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -393,9 +392,6 @@ public class CapacityScheduler extends
   @Override
   public void serviceStart() throws Exception {
     startSchedulerThreads();
-    if (this.csConfProvider instanceof MutableConfigurationProvider) {
-      ((MutableConfigurationProvider) csConfProvider).recoverConf();
-    }
     super.serviceStart();
   }
 
@@ -2619,19 +2615,15 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public void updateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
-    if (isConfigurationMutable()) {
-      ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
-          user, confUpdate);
-    } else {
-      throw new UnsupportedOperationException("Configured CS configuration " +
-          "provider does not support updating configuration.");
-    }
+  public boolean isConfigurationMutable() {
+    return csConfProvider instanceof MutableConfigurationProvider;
   }
 
   @Override
-  public boolean isConfigurationMutable() {
-    return csConfProvider instanceof MutableConfigurationProvider;
+  public MutableConfigurationProvider getMutableConfProvider() {
+    if (isConfigurationMutable()) {
+      return (MutableConfigurationProvider) csConfProvider;
+    }
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index c63734d..d69c236 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -19,8 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -28,48 +29,35 @@ import java.util.Map;
  * A default implementation of {@link YarnConfigurationStore}. Doesn't offer
  * persistent configuration storage, just stores the configuration in memory.
  */
-public class InMemoryConfigurationStore implements YarnConfigurationStore {
+public class InMemoryConfigurationStore extends YarnConfigurationStore {
 
   private Configuration schedConf;
-  private LinkedList<LogMutation> pendingMutations;
-  private long pendingId;
+  private LogMutation pendingMutation;
 
   @Override
-  public void initialize(Configuration conf, Configuration schedConf) {
+  public void initialize(Configuration conf, Configuration schedConf,
+      RMContext rmContext) {
     this.schedConf = schedConf;
-    this.pendingMutations = new LinkedList<>();
-    this.pendingId = 0;
   }
 
   @Override
-  public synchronized long logMutation(LogMutation logMutation) {
-    logMutation.setId(++pendingId);
-    pendingMutations.add(logMutation);
-    return pendingId;
+  public void logMutation(LogMutation logMutation) {
+    pendingMutation = logMutation;
   }
 
   @Override
-  public synchronized boolean confirmMutation(long id, boolean isValid) {
-    LogMutation mutation = pendingMutations.poll();
-    // If confirmMutation is called out of order, discard mutations until id
-    // is reached.
-    while (mutation != null) {
-      if (mutation.getId() == id) {
-        if (isValid) {
-          Map<String, String> mutations = mutation.getUpdates();
-          for (Map.Entry<String, String> kv : mutations.entrySet()) {
-            if (kv.getValue() == null) {
-              schedConf.unset(kv.getKey());
-            } else {
-              schedConf.set(kv.getKey(), kv.getValue());
-            }
-          }
+  public void confirmMutation(boolean isValid) {
+    if (isValid) {
+      for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
+          .entrySet()) {
+        if (kv.getValue() == null) {
+          schedConf.unset(kv.getKey());
+        } else {
+          schedConf.set(kv.getKey(), kv.getValue());
         }
-        return true;
       }
-      mutation = pendingMutations.poll();
     }
-    return false;
+    pendingMutation = null;
   }
 
   @Override
@@ -78,13 +66,30 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
   }
 
   @Override
-  public synchronized List<LogMutation> getPendingMutations() {
-    return new LinkedList<>(pendingMutations);
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    // Unimplemented.
+    return null;
   }
 
   @Override
-  public List<LogMutation> getConfirmedConfHistory(long fromId) {
-    // Unimplemented.
+  public Version getConfStoreVersion() throws Exception {
+    // Does nothing.
     return null;
   }
+
+  @Override
+  public void storeVersion() throws Exception {
+    // Does nothing.
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    // Does nothing.
+    return null;
+  }
+
+  @Override
+  public void checkVersion() {
+    // Does nothing. (Version is always compatible since it's in memory)
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 1280fab..1b0eb9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -26,6 +26,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
@@ -55,58 +59,32 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 /**
  * A LevelDB implementation of {@link YarnConfigurationStore}.
  */
-public class LeveldbConfigurationStore implements YarnConfigurationStore {
+public class LeveldbConfigurationStore extends YarnConfigurationStore {
 
   public static final Log LOG =
       LogFactory.getLog(LeveldbConfigurationStore.class);
 
   private static final String DB_NAME = "yarn-conf-store";
-  private static final String LOG_PREFIX = "log.";
-  private static final String LOG_COMMITTED_TXN = "committedTxn";
+  private static final String LOG_KEY = "log";
+  private static final String VERSION_KEY = "version";
 
   private DB db;
-  // Txnid for the last transaction logged to the store.
-  private long txnId = 0;
-  private long minTxn = 0;
   private long maxLogs;
   private Configuration conf;
-  private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
+  private LogMutation pendingMutation;
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(0, 1);
   private Timer compactionTimer;
   private long compactionIntervalMsec;
 
   @Override
-  public void initialize(Configuration config, Configuration schedConf)
-      throws IOException {
+  public void initialize(Configuration config, Configuration schedConf,
+      RMContext rmContext) throws IOException {
     this.conf = config;
     try {
       this.db = initDatabase(schedConf);
-      this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
-          StandardCharsets.UTF_8));
-      DBIterator itr = db.iterator();
-      itr.seek(bytes(LOG_PREFIX + txnId));
-      // Seek to first uncommitted log
-      itr.next();
-      while (itr.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = itr.next();
-        if (!new String(entry.getKey(), StandardCharsets.UTF_8)
-            .startsWith(LOG_PREFIX)) {
-          break;
-        }
-        pendingMutations.add(deserLogMutation(entry.getValue()));
-        txnId++;
-      }
-      // Get the earliest txnId stored in logs
-      itr.seekToFirst();
-      if (itr.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = itr.next();
-        byte[] key = entry.getKey();
-        String logId = new String(key, StandardCharsets.UTF_8);
-        if (logId.startsWith(LOG_PREFIX)) {
-          minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
-        }
-      }
       this.maxLogs = config.getLong(
-          YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
+          YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
           YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
       this.compactionIntervalMsec = config.getLong(
           YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
@@ -127,33 +105,23 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
       public int compare(byte[] key1, byte[] key2) {
         String key1Str = new String(key1, StandardCharsets.UTF_8);
         String key2Str = new String(key2, StandardCharsets.UTF_8);
-        int key1Txn = Integer.MAX_VALUE;
-        int key2Txn = Integer.MAX_VALUE;
-        if (key1Str.startsWith(LOG_PREFIX)) {
-          key1Txn = Integer.parseInt(key1Str.substring(
-              key1Str.indexOf('.') + 1));
-        }
-        if (key2Str.startsWith(LOG_PREFIX)) {
-          key2Txn = Integer.parseInt(key2Str.substring(
-              key2Str.indexOf('.') + 1));
-        }
-        // TODO txnId could overflow, in theory
-        if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
-          if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
-            return 0;
-          } else if (key1Str.equals(LOG_COMMITTED_TXN)) {
-            return -1;
-          } else if (key2Str.equals(LOG_COMMITTED_TXN)) {
-            return 1;
-          }
-          return key1Str.compareTo(key2Str);
+        if (key1Str.equals(key2Str)) {
+          return 0;
+        } else if (key1Str.equals(VERSION_KEY)) {
+          return -1;
+        } else if (key2Str.equals(VERSION_KEY)) {
+          return 1;
+        } else if (key1Str.equals(LOG_KEY)) {
+          return -1;
+        } else if (key2Str.equals(LOG_KEY)) {
+          return 1;
         }
-        return key1Txn - key2Txn;
+        return key1Str.compareTo(key2Str);
       }
 
       @Override
       public String name() {
-        return "logComparator";
+        return "keyComparator";
       }
 
       public byte[] findShortestSeparator(byte[] start, byte[] limit) {
@@ -164,6 +132,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
         return key;
       }
     });
+
     LOG.info("Using conf database at " + storeRoot);
     File dbfile = new File(storeRoot.toString());
     try {
@@ -179,7 +148,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
           for (Map.Entry<String, String> kv : config) {
             initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
           }
-          initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
           db.write(initBatch);
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
@@ -208,28 +176,22 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
   }
 
   @Override
-  public synchronized long logMutation(LogMutation logMutation)
-      throws IOException {
-    logMutation.setId(++txnId);
-    WriteBatch logBatch = db.createWriteBatch();
-    logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
-    if (txnId - minTxn >= maxLogs) {
-      logBatch.delete(bytes(LOG_PREFIX + minTxn));
-      minTxn++;
+  public void logMutation(LogMutation logMutation) throws IOException {
+    LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
+    logs.add(logMutation);
+    if (logs.size() > maxLogs) {
+      logs.removeFirst();
     }
-    db.write(logBatch);
-    pendingMutations.add(logMutation);
-    return txnId;
+    db.put(bytes(LOG_KEY), serLogMutations(logs));
+    pendingMutation = logMutation;
   }
 
   @Override
-  public synchronized boolean confirmMutation(long id, boolean isValid)
-      throws IOException {
+  public void confirmMutation(boolean isValid) throws IOException {
     WriteBatch updateBatch = db.createWriteBatch();
     if (isValid) {
-      LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
       for (Map.Entry<String, String> changes :
-          mutation.getUpdates().entrySet()) {
+          pendingMutation.getUpdates().entrySet()) {
         if (changes.getValue() == null || changes.getValue().isEmpty()) {
           updateBatch.delete(bytes(changes.getKey()));
         } else {
@@ -237,28 +199,24 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
         }
       }
     }
-    updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
     db.write(updateBatch);
-    // Assumes logMutation and confirmMutation are done in the same
-    // synchronized method. For example,
-    // {@link MutableCSConfigurationProvider#mutateConfiguration(
-    // UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
-    pendingMutations.removeFirst();
-    return true;
+    pendingMutation = null;
   }
 
-  private byte[] serLogMutation(LogMutation mutation) throws IOException {
+  private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
+      IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (ObjectOutput oos = new ObjectOutputStream(baos)) {
-      oos.writeObject(mutation);
+      oos.writeObject(mutations);
       oos.flush();
       return baos.toByteArray();
     }
   }
-  private LogMutation deserLogMutation(byte[] mutation) throws IOException {
+  private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
+      IOException {
     try (ObjectInput input = new ObjectInputStream(
-        new ByteArrayInputStream(mutation))) {
-      return (LogMutation) input.readObject();
+        new ByteArrayInputStream(mutations))) {
+      return (LinkedList<LogMutation>) input.readObject();
     } catch (ClassNotFoundException e) {
       throw new IOException(e);
     }
@@ -267,7 +225,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
   @Override
   public synchronized Configuration retrieve() {
     DBIterator itr = db.iterator();
-    itr.seek(bytes(LOG_COMMITTED_TXN));
+    itr.seek(bytes(LOG_KEY));
     Configuration config = new Configuration(false);
     itr.next();
     while (itr.hasNext()) {
@@ -279,11 +237,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
   }
 
   @Override
-  public List<LogMutation> getPendingMutations() {
-    return new LinkedList<>(pendingMutations);
-  }
-
-  @Override
   public List<LogMutation> getConfirmedConfHistory(long fromId) {
     return null; // unimplemented
   }
@@ -299,6 +252,39 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
     }
   }
 
+  // TODO: following is taken from LeveldbRMStateStore
+  @Override
+  public Version getConfStoreVersion() throws Exception {
+    Version version = null;
+    try {
+      byte[] data = db.get(bytes(VERSION_KEY));
+      if (data != null) {
+        version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
+            .parseFrom(data));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    return version;
+  }
+
+  @Override
+  public void storeVersion() throws Exception {
+    String key = VERSION_KEY;
+    byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
+        .toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
   private class CompactionTimerTask extends TimerTask {
     @Override
     public void run() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index d03b2e2..70d1840 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -18,20 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
       LogFactory.getLog(MutableCSConfigurationProvider.class);
 
   private Configuration schedConf;
+  private Configuration oldConf;
   private YarnConfigurationStore confStore;
   private ConfigurationMutationACLPolicy aclMutationPolicy;
   private RMContext rmContext;
@@ -76,6 +74,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
       this.confStore = new LeveldbConfigurationStore();
       break;
+    case YarnConfiguration.ZK_CONFIGURATION_STORE:
+      this.confStore = new ZKConfigurationStore();
+      break;
     default:
       this.confStore = YarnConfigurationStoreFactory.getStore(config);
       break;
@@ -89,7 +90,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     for (Map.Entry<String, String> kv : initialSchedConf) {
       schedConf.set(kv.getKey(), kv.getValue());
     }
-    confStore.initialize(config, schedConf);
+    try {
+      confStore.initialize(config, schedConf, rmContext);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
     // After initializing confStore, the store may already have an existing
     // configuration. Use this one.
     schedConf = confStore.retrieve();
@@ -98,6 +103,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     aclMutationPolicy.init(config, rmContext);
   }
 
+  @VisibleForTesting
+  public YarnConfigurationStore getConfStore() {
+    return confStore;
+  }
+
   @Override
   public CapacitySchedulerConfiguration loadConfiguration(Configuration
       configuration) throws IOException {
@@ -107,16 +117,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public synchronized void mutateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
-    if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
-      throw new AccessControlException("User is not admin of all modified" +
-          " queues.");
-    }
-    Configuration oldConf = new Configuration(schedConf);
+  public ConfigurationMutationACLPolicy getAclMutationPolicy() {
+    return aclMutationPolicy;
+  }
+
+  @Override
+  public void logAndApplyMutation(UserGroupInformation user,
+      SchedConfUpdateInfo confUpdate) throws Exception {
+    oldConf = new Configuration(schedConf);
     Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
     LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
-    long id = confStore.logMutation(log);
+    confStore.logMutation(log);
     for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
       if (kv.getValue() == null) {
         schedConf.unset(kv.getKey());
@@ -124,47 +135,33 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
         schedConf.set(kv.getKey(), kv.getValue());
       }
     }
-    try {
-      rmContext.getRMAdminService().refreshQueues();
-    } catch (IOException | YarnException e) {
+  }
+
+  @Override
+  public void confirmPendingMutation(boolean isValid) throws Exception {
+    confStore.confirmMutation(isValid);
+    if (!isValid) {
       schedConf = oldConf;
-      confStore.confirmMutation(id, false);
-      throw e;
     }
-    confStore.confirmMutation(id, true);
   }
 
   @Override
-  public void recoverConf() throws IOException {
-    List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
-    Configuration oldConf = new Configuration(schedConf);
-    for (LogMutation mutation : uncommittedLogs) {
-      for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
-        if (kv.getValue() == null) {
-          schedConf.unset(kv.getKey());
-        } else {
-          schedConf.set(kv.getKey(), kv.getValue());
-        }
-      }
-      try {
-        rmContext.getScheduler().reinitialize(schedConf, rmContext);
-      } catch (IOException e) {
-        schedConf = oldConf;
-        confStore.confirmMutation(mutation.getId(), false);
-        LOG.info("Configuration mutation " + mutation.getId()
-            + " was rejected", e);
-        continue;
-      }
-      confStore.confirmMutation(mutation.getId(), true);
-      LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
-    }
+  public void reloadConfigurationFromStore() throws Exception {
+    schedConf = confStore.retrieve();
+  }
+
+  private List<String> getSiblingQueues(String queuePath, Configuration conf) {
+    String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+    String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
+        parentQueue + CapacitySchedulerConfiguration.DOT +
+        CapacitySchedulerConfiguration.QUEUES;
+    return new ArrayList<>(conf.getStringCollection(childQueuesKey));
   }
 
   private Map<String, String> constructKeyValueConfUpdate(
       SchedConfUpdateInfo mutationInfo) throws IOException {
-    CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
     CapacitySchedulerConfiguration proposedConf =
-        new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
+        new CapacitySchedulerConfiguration(schedConf, false);
     Map<String, String> confUpdate = new HashMap<>();
     for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
       removeQueue(queueToRemove, proposedConf, confUpdate);
@@ -188,40 +185,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     if (queueToRemove == null) {
       return;
     } else {
-      CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
       String queueName = queueToRemove.substring(
           queueToRemove.lastIndexOf('.') + 1);
-      CSQueue queue = cs.getQueue(queueName);
-      if (queue == null ||
-          !queue.getQueuePath().equals(queueToRemove)) {
-        throw new IOException("Queue " + queueToRemove + " not found");
-      } else if (queueToRemove.lastIndexOf('.') == -1) {
+      if (queueToRemove.lastIndexOf('.') == -1) {
         throw new IOException("Can't remove queue " + queueToRemove);
-      }
-      String parentQueuePath = queueToRemove.substring(0, queueToRemove
-          .lastIndexOf('.'));
-      String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
-      List<String> newSiblingQueues = new ArrayList<>();
-      for (String siblingQueue : siblingQueues) {
-        if (!siblingQueue.equals(queueName)) {
-          newSiblingQueues.add(siblingQueue);
-        }
-      }
-      proposedConf.setQueues(parentQueuePath, newSiblingQueues
-          .toArray(new String[0]));
-      String queuesConfig = CapacitySchedulerConfiguration.PREFIX
-          + parentQueuePath + CapacitySchedulerConfiguration.DOT
-          + CapacitySchedulerConfiguration.QUEUES;
-      if (newSiblingQueues.size() == 0) {
-        confUpdate.put(queuesConfig, null);
       } else {
-        confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
-      }
-      for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
-          ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
-          .entrySet()) {
-        proposedConf.unset(confRemove.getKey());
-        confUpdate.put(confRemove.getKey(), null);
+        List<String> siblingQueues = getSiblingQueues(queueToRemove,
+            proposedConf);
+        if (!siblingQueues.contains(queueName)) {
+          throw new IOException("Queue " + queueToRemove + " not found");
+        }
+        siblingQueues.remove(queueName);
+        String parentQueuePath = queueToRemove.substring(0, queueToRemove
+            .lastIndexOf('.'));
+        proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
+            new String[0]));
+        String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+            + parentQueuePath + CapacitySchedulerConfiguration.DOT
+            + CapacitySchedulerConfiguration.QUEUES;
+        if (siblingQueues.size() == 0) {
+          confUpdate.put(queuesConfig, null);
+        } else {
+          confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
+        }
+        for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+            ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
+            .entrySet()) {
+          proposedConf.unset(confRemove.getKey());
+          confUpdate.put(confRemove.getKey(), null);
+        }
       }
     }
   }
@@ -232,13 +224,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     if (addInfo == null) {
       return;
     } else {
-      CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
       String queuePath = addInfo.getQueue();
       String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
-      if (cs.getQueue(queueName) != null) {
-        throw new IOException("Can't add existing queue " + queuePath);
-      } else if (queuePath.lastIndexOf('.') == -1) {
+      if (queuePath.lastIndexOf('.') == -1) {
         throw new IOException("Can't add invalid queue " + queuePath);
+      } else if (getSiblingQueues(queuePath, proposedConf).contains(
+          queueName)) {
+        throw new IOException("Can't add existing queue " + queuePath);
       }
       String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
       String[] siblings = proposedConf.getQueues(parentQueue);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 065c877..1356535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -18,7 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 
 import java.io.IOException;
@@ -39,36 +44,26 @@ import java.util.Map;
  * {@code getPendingMutations}, and replay/confirm them via
  * {@code confirmMutation} as in the normal case.
  */
-public interface YarnConfigurationStore {
+public abstract class YarnConfigurationStore {
 
+  public static final Log LOG =
+      LogFactory.getLog(YarnConfigurationStore.class);
   /**
    * LogMutation encapsulates the fields needed for configuration mutation
    * audit logging and recovery.
    */
-  class LogMutation implements Serializable {
+  static class LogMutation implements Serializable {
     private Map<String, String> updates;
     private String user;
-    private long id;
 
     /**
-     * Create log mutation prior to logging.
+     * Create log mutation.
      * @param updates key-value configuration updates
      * @param user user who requested configuration change
      */
-    public LogMutation(Map<String, String> updates, String user) {
-      this(updates, user, 0);
-    }
-
-    /**
-     * Create log mutation for recovery.
-     * @param updates key-value configuration updates
-     * @param user user who requested configuration change
-     * @param id transaction id of configuration change
-     */
-    LogMutation(Map<String, String> updates, String user, long id) {
+    LogMutation(Map<String, String> updates, String user) {
       this.updates = updates;
       this.user = user;
-      this.id = id;
     }
 
     /**
@@ -86,75 +81,92 @@ public interface YarnConfigurationStore {
     public String getUser() {
       return user;
     }
-
-    /**
-     * Get transaction id of this configuration change.
-     * @return transaction id
-     */
-    public long getId() {
-      return id;
-    }
-
-    /**
-     * Set transaction id of this configuration change.
-     * @param id transaction id
-     */
-    public void setId(long id) {
-      this.id = id;
-    }
   }
 
   /**
-   * Initialize the configuration store.
+   * Initialize the configuration store, with schedConf as the initial
+   * scheduler configuration. If a persisted store already exists, use the
+   * scheduler configuration stored there, and ignore schedConf.
    * @param conf configuration to initialize store with
-   * @param schedConf Initial key-value configuration to persist
+   * @param schedConf Initial key-value scheduler configuration to persist.
+   * @param rmContext RMContext for this configuration store
    * @throws IOException if initialization fails
    */
-  void initialize(Configuration conf, Configuration schedConf)
-      throws IOException;
+  public abstract void initialize(Configuration conf, Configuration schedConf,
+      RMContext rmContext) throws Exception;
 
   /**
-   * Logs the configuration change to backing store. Generates an id associated
-   * with this mutation, sets it in {@code logMutation}, and returns it.
+   * Logs the configuration change to backing store.
    * @param logMutation configuration change to be persisted in write ahead log
-   * @return id which configuration store associates with this mutation
    * @throws IOException if logging fails
    */
-  long logMutation(LogMutation logMutation) throws IOException;
+  public abstract void logMutation(LogMutation logMutation) throws Exception;
 
   /**
    * Should be called after {@code logMutation}. Gets the pending mutation
-   * associated with {@code id} and marks the mutation as persisted (no longer
-   * pending). If isValid is true, merge the mutation with the persisted
+   * last logged by {@code logMutation} and marks the mutation as persisted (no
+   * longer pending). If isValid is true, merge the mutation with the persisted
    * configuration.
-   *
-   * If {@code confirmMutation} is called with ids in a different order than
-   * was returned by {@code logMutation}, the result is implementation
-   * dependent.
-   * @param id id of mutation to be confirmed
-   * @param isValid if true, update persisted configuration with mutation
-   *                associated with {@code id}.
-   * @return true on success
-   * @throws IOException if mutation confirmation fails
+   * @param isValid if true, update persisted configuration with pending
+   *                mutation.
+   * @throws Exception if mutation confirmation fails
    */
-  boolean confirmMutation(long id, boolean isValid) throws IOException;
+  public abstract void confirmMutation(boolean isValid) throws Exception;
 
   /**
    * Retrieve the persisted configuration.
    * @return configuration as key-value
    */
-  Configuration retrieve();
-
-  /**
-   * Get the list of pending mutations, in the order they were logged.
-   * @return list of mutations
-   */
-  List<LogMutation> getPendingMutations();
+  public abstract Configuration retrieve();
 
   /**
    * Get a list of confirmed configuration mutations starting from a given id.
    * @param fromId id from which to start getting mutations, inclusive
    * @return list of configuration mutations
    */
-  List<LogMutation> getConfirmedConfHistory(long fromId);
+  public abstract List<LogMutation> getConfirmedConfHistory(long fromId);
+
+  /**
+   * Get schema version of persisted conf store, for detecting compatibility
+   * issues when changing conf store schema.
+   * @return Schema version currently used by the persisted configuration store.
+   * @throws Exception On version fetch failure
+   */
+  protected abstract Version getConfStoreVersion() throws Exception;
+
+  /**
+   * Persist the hard-coded schema version to the conf store.
+   * @throws Exception On storage failure
+   */
+  protected abstract void storeVersion() throws Exception;
+
+  /**
+   * Get the hard-coded schema version, for comparison against the schema
+   * version currently persisted.
+   * @return Current hard-coded schema version
+   */
+  protected abstract Version getCurrentVersion();
+
+  public void checkVersion() throws Exception {
+    // TODO this was taken from RMStateStore. Should probably refactor
+    Version loadedVersion = getConfStoreVersion();
+    LOG.info("Loaded configuration store version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    // if there is no version info, treat it as CURRENT_VERSION_INFO;
+    if (loadedVersion == null) {
+      loadedVersion = getCurrentVersion();
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing configuration store version info "
+          + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new RMStateVersionIncompatibleException(
+          "Expecting configuration store version " + getCurrentVersion()
+              + ", but loading version " + loadedVersion);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
new file mode 100644
index 0000000..a0bba8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Zookeeper-based implementation of {@link YarnConfigurationStore}.
+ */
+public class ZKConfigurationStore extends YarnConfigurationStore {
+
+  public static final Log LOG =
+      LogFactory.getLog(ZKConfigurationStore.class);
+
+  private long maxLogs;
+
+  @VisibleForTesting
+  protected static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(0, 1);
+  private Configuration conf;
+  private LogMutation pendingMutation;
+
+  private String znodeParentPath;
+
+  private static final String ZK_VERSION_PATH = "VERSION";
+  private static final String LOGS_PATH = "LOGS";
+  private static final String CONF_STORE_PATH = "CONF_STORE";
+  private static final String FENCING_PATH = "FENCING";
+
+  private String zkVersionPath;
+  private String logsPath;
+  private String confStorePath;
+  private String fencingNodePath;
+
+  @VisibleForTesting
+  protected ZKCuratorManager zkManager;
+  private List<ACL> zkAcl;
+
+  @Override
+  public void initialize(Configuration config, Configuration schedConf,
+      RMContext rmContext) throws Exception {
+    this.conf = config;
+    this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
+        YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
+    this.znodeParentPath =
+        conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+            YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+    this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf);
+    this.zkAcl = ZKCuratorManager.getZKAcls(conf);
+
+    this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
+    this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
+    this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
+    this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
+
+    zkManager.createRootDirRecursively(znodeParentPath);
+    zkManager.delete(fencingNodePath);
+
+    if (!zkManager.exists(logsPath)) {
+      zkManager.create(logsPath);
+      zkManager.setData(logsPath,
+          serializeObject(new LinkedList<LogMutation>()), -1);
+    }
+
+    if (!zkManager.exists(confStorePath)) {
+      zkManager.create(confStorePath);
+      HashMap<String, String> mapSchedConf = new HashMap<>();
+      for (Map.Entry<String, String> entry : schedConf) {
+        mapSchedConf.put(entry.getKey(), entry.getValue());
+      }
+      zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
+    }
+  }
+
+  @VisibleForTesting
+  protected LinkedList<LogMutation> getLogs() throws Exception {
+    return (LinkedList<LogMutation>)
+        deserializeObject(zkManager.getData(logsPath));
+  }
+
+  // TODO: following version-related code is taken from ZKRMStateStore
+  @Override
+  public Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  @Override
+  public Version getConfStoreVersion() throws Exception {
+    if (zkManager.exists(zkVersionPath)) {
+      byte[] data = zkManager.getData(zkVersionPath);
+      return new VersionPBImpl(YarnServerCommonProtos.VersionProto
+          .parseFrom(data));
+    }
+
+    return null;
+  }
+
+  @Override
+  public synchronized void storeVersion() throws Exception {
+    byte[] data =
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+
+    if (zkManager.exists(zkVersionPath)) {
+      zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
+    } else {
+      zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
+          zkAcl, fencingNodePath);
+    }
+  }
+
+  @Override
+  public void logMutation(LogMutation logMutation) throws Exception {
+    byte[] storedLogs = zkManager.getData(logsPath);
+    LinkedList<LogMutation> logs = new LinkedList<>();
+    if (storedLogs != null) {
+      logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
+    }
+    logs.add(logMutation);
+    if (logs.size() > maxLogs) {
+      logs.remove(logs.removeFirst());
+    }
+    zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
+        fencingNodePath);
+    pendingMutation = logMutation;
+  }
+
+  @Override
+  public void confirmMutation(boolean isValid)
+      throws Exception {
+    if (isValid) {
+      Configuration storedConfigs = retrieve();
+      Map<String, String> mapConf = new HashMap<>();
+      for (Map.Entry<String, String> storedConf : storedConfigs) {
+        mapConf.put(storedConf.getKey(), storedConf.getValue());
+      }
+      for (Map.Entry<String, String> confChange :
+          pendingMutation.getUpdates().entrySet()) {
+        if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
+          mapConf.remove(confChange.getKey());
+        } else {
+          mapConf.put(confChange.getKey(), confChange.getValue());
+        }
+      }
+      zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
+          zkAcl, fencingNodePath);
+    }
+    pendingMutation = null;
+  }
+
+  @Override
+  public synchronized Configuration retrieve() {
+    byte[] serializedSchedConf;
+    try {
+      serializedSchedConf = zkManager.getData(confStorePath);
+    } catch (Exception e) {
+      LOG.error("Failed to retrieve configuration from zookeeper store", e);
+      return null;
+    }
+    try {
+      Map<String, String> map =
+          (HashMap<String, String>) deserializeObject(serializedSchedConf);
+      Configuration c = new Configuration();
+      for (Map.Entry<String, String> e : map.entrySet()) {
+        c.set(e.getKey(), e.getValue());
+      }
+      return c;
+    } catch (Exception e) {
+      LOG.error("Exception while deserializing scheduler configuration " +
+          "from store", e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    return null; // unimplemented
+  }
+
+  private static String getNodePath(String root, String nodeName) {
+    return ZKCuratorManager.getNodePath(root, nodeName);
+  }
+
+  private static byte[] serializeObject(Object o) throws Exception {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);) {
+      oos.writeObject(o);
+      oos.flush();
+      baos.flush();
+      return baos.toByteArray();
+    }
+  }
+
+  private static Object deserializeObject(byte[] bytes) throws Exception {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);) {
+      return ois.readObject();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 1da4e65..d264c10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -2464,7 +2465,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
   @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
       MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
-  public Response updateSchedulerConfiguration(SchedConfUpdateInfo
+  public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
       mutationInfo, @Context HttpServletRequest hsr)
       throws AuthorizationException, InterruptedException {
     init();
@@ -2479,17 +2480,32 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
     }
 
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    if (scheduler instanceof MutableConfScheduler) {
+    if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
+        scheduler).isConfigurationMutable()) {
       try {
         callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
-          public Void run() throws IOException, YarnException {
-            ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI,
-                mutationInfo);
+          public Void run() throws Exception {
+            MutableConfigurationProvider provider = ((MutableConfScheduler)
+                scheduler).getMutableConfProvider();
+            if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
+                mutationInfo)) {
+              throw new org.apache.hadoop.security.AccessControlException("User"
+                  + " is not admin of all modified queues.");
+            }
+            provider.logAndApplyMutation(callerUGI, mutationInfo);
+            try {
+              rm.getRMContext().getRMAdminService().refreshQueues();
+            } catch (IOException | YarnException e) {
+              provider.confirmPendingMutation(false);
+              throw e;
+            }
+            provider.confirmPendingMutation(true);
             return null;
           }
         });
       } catch (IOException e) {
+        LOG.error("Exception thrown when modifying configuration.", e);
         return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
             .build();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
new file mode 100644
index 0000000..bbe9570
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Base class for {@link YarnConfigurationStore} implementations.
+ */
+public abstract class ConfigurationStoreBaseTest {
+
+  protected YarnConfigurationStore confStore = createConfStore();
+
+  protected abstract YarnConfigurationStore createConfStore();
+
+  protected Configuration conf;
+  protected Configuration schedConf;
+  protected RMContext rmContext;
+
+  protected static final String TEST_USER = "testUser";
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new Configuration();
+    this.schedConf = new Configuration(false);
+  }
+
+  @Test
+  public void testConfigurationUpdate() throws Exception {
+    schedConf.set("key1", "val1");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val1", confStore.retrieve().get("key1"));
+
+    Map<String, String> update1 = new HashMap<>();
+    update1.put("keyUpdate1", "valUpdate1");
+    YarnConfigurationStore.LogMutation mutation1 =
+        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+    confStore.logMutation(mutation1);
+    confStore.confirmMutation(true);
+    assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
+
+    Map<String, String> update2 = new HashMap<>();
+    update2.put("keyUpdate2", "valUpdate2");
+    YarnConfigurationStore.LogMutation mutation2 =
+        new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+    confStore.logMutation(mutation2);
+    confStore.confirmMutation(false);
+    assertNull("Configuration should not be updated",
+        confStore.retrieve().get("keyUpdate2"));
+  }
+
+  @Test
+  public void testNullConfigurationUpdate() throws Exception {
+    schedConf.set("key", "val");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+
+    Map<String, String> update = new HashMap<>();
+    update.put("key", null);
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    confStore.logMutation(mutation);
+    confStore.confirmMutation(true);
+    assertNull(confStore.retrieve().get("key"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
new file mode 100644
index 0000000..c40d16a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+/**
+ * Tests {@link InMemoryConfigurationStore}.
+ */
+public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
+
+  @Override
+  protected YarnConfigurationStore createConfStore() {
+    return new InMemoryConfigurationStore();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 635a184..9b080cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -30,14 +29,11 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -82,25 +78,21 @@ public class TestMutableCSConfigurationProvider {
   }
 
   @Test
-  public void testInMemoryBackedProvider() throws IOException, YarnException {
+  public void testInMemoryBackedProvider() throws Exception {
     Configuration conf = new Configuration();
     confProvider.init(conf);
     assertNull(confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
-    doNothing().when(adminService).refreshQueues();
-    confProvider.mutateConfiguration(TEST_USER, goodUpdate);
+    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
-    doThrow(new IOException()).when(adminService).refreshQueues();
-    try {
-      confProvider.mutateConfiguration(TEST_USER, badUpdate);
-    } catch (IOException e) {
-      // Expected exception.
-    }
+    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(false);
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff39c0de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
deleted file mode 100644
index 631ce65..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestYarnConfigurationStore {
-
-  private YarnConfigurationStore confStore;
-  private Configuration schedConf;
-
-  private static final String testUser = "testUser";
-
-  @Before
-  public void setUp() {
-    schedConf = new Configuration(false);
-    schedConf.set("key1", "val1");
-  }
-
-  @Test
-  public void testInMemoryConfigurationStore() throws IOException {
-    confStore = new InMemoryConfigurationStore();
-    confStore.initialize(new Configuration(), schedConf);
-    assertEquals("val1", confStore.retrieve().get("key1"));
-
-    Map<String, String> update1 = new HashMap<>();
-    update1.put("keyUpdate1", "valUpdate1");
-    LogMutation mutation1 = new LogMutation(update1, testUser);
-    long id = confStore.logMutation(mutation1);
-    assertEquals(1, confStore.getPendingMutations().size());
-    confStore.confirmMutation(id, true);
-    assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
-    assertEquals(0, confStore.getPendingMutations().size());
-
-    Map<String, String> update2 = new HashMap<>();
-    update2.put("keyUpdate2", "valUpdate2");
-    LogMutation mutation2 = new LogMutation(update2, testUser);
-    id = confStore.logMutation(mutation2);
-    assertEquals(1, confStore.getPendingMutations().size());
-    confStore.confirmMutation(id, false);
-    assertNull("Configuration should not be updated",
-        confStore.retrieve().get("keyUpdate2"));
-    assertEquals(0, confStore.getPendingMutations().size());
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message