hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xg...@apache.org
Subject [14/18] hadoop git commit: YARN-7252. Removing queue then failing over results in exception
Date Mon, 09 Oct 2017 21:05:41 GMT
YARN-7252. Removing queue then failing over results in exception


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4bd3f0df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4bd3f0df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4bd3f0df

Branch: refs/heads/branch-2
Commit: 4bd3f0df42c3f4f2dc1f0a05f9f97d89e2e66518
Parents: 06c8938
Author: Jonathan Hung <jhung@linkedin.com>
Authored: Tue Sep 26 11:41:05 2017 -0700
Committer: Jonathan Hung <jhung@linkedin.com>
Committed: Mon Oct 9 11:12:08 2017 -0700

----------------------------------------------------------------------
 .../capacity/CapacitySchedulerContext.java      |   6 ++
 .../capacity/CapacitySchedulerQueueManager.java |  11 +-
 .../capacity/conf/TestZKConfigurationStore.java | 102 +++++++++++++++++++
 3 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bd3f0df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 9aeaec6..7c918a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -88,4 +88,10 @@ public interface CapacitySchedulerContext {
    * @return Max Cluster level App priority.
    */
   Priority getMaxClusterLevelAppPriority();
+
+  /**
+   * Returns if configuration is mutable.
+   * @return if configuration is mutable
+   */
+  boolean isConfigurationMutable();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bd3f0df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 1ceb6fb..48c289f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueState;
@@ -170,8 +171,14 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     CSQueue newRoot =  parseQueue(this.csContext, newConf, null,
         CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
 
-    // Ensure queue hiearchy in the new XML file is proper.
-    validateQueueHierarchy(queues, newQueues);
+    // When failing over, if using configuration store, don't validate queue
+    // hierarchy since queues can be removed without being STOPPED.
+    if (!csContext.isConfigurationMutable() ||
+        csContext.getRMContext().getHAServiceState()
+            != HAServiceProtocol.HAServiceState.STANDBY) {
+      // Ensure queue hiearchy in the new XML file is proper.
+      validateQueueHierarchy(queues, newQueues);
+    }
 
     // Add new queues and delete OldQeueus only after validation.
     updateQueues(queues, newQueues);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bd3f0df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 355f741..0cf5e6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -38,18 +38,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfSchedu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link ZKConfigurationStore}.
@@ -303,6 +306,105 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest
{
     rm2.close();
   }
 
+  /**
+   * When failing over, if RM1 stopped and removed a queue that RM2 has in
+   * memory, failing over to RM2 should not throw an exception.
+   * @throws Exception
+   */
+  @Test
+  public void testFailoverAfterRemoveQueue() throws Exception {
+    HAServiceProtocol.StateChangeRequestInfo req =
+        new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+    ResourceManager rm1 = new MockRM(conf1);
+    rm1.start();
+    rm1.getRMContext().getRMAdminService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm1.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+    Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+    ResourceManager rm2 = new MockRM(conf2);
+    rm2.start();
+    assertEquals("RM should be Standby",
+        HAServiceProtocol.HAServiceState.STANDBY,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+    UserGroupInformation user = UserGroupInformation
+        .createUserForTesting(TEST_USER, new String[0]);
+    MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+        rm1.getResourceScheduler()).getMutableConfProvider();
+    // Add root.a
+    SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+    Map<String, String> addParams = new HashMap<>();
+    addParams.put("capacity", "100");
+    QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams);
+    schedConfUpdateInfo.getAddQueueInfo().add(addInfo);
+    // Stop root.default
+    Map<String, String> stopParams = new HashMap<>();
+    stopParams.put("state", "STOPPED");
+    stopParams.put("capacity", "0");
+    QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
+    schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+    confProvider.confirmPendingMutation(true);
+    assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
+            (",")).contains("a"));
+
+    // Remove root.default
+    schedConfUpdateInfo.getUpdateQueueInfo().clear();
+    schedConfUpdateInfo.getAddQueueInfo().clear();
+    schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+    confProvider.confirmPendingMutation(true);
+    assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+
+    // Start RM2 and verifies it starts with updated configuration
+    rm2.getRMContext().getRMAdminService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm2.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+    for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
+      if (HAServiceProtocol.HAServiceState.ACTIVE ==
+          rm1.getRMContext().getRMAdminService().getServiceStatus()
+              .getState()) {
+        Thread.sleep(100);
+      }
+    }
+    assertEquals("RM should have been fenced",
+        HAServiceProtocol.HAServiceState.STANDBY,
+        rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+    assertEquals("a", ((MutableCSConfigurationProvider) (
+        (CapacityScheduler) rm2.getResourceScheduler())
+        .getMutableConfProvider()).getConfStore().retrieve()
+        .get("yarn.scheduler.capacity.root.queues"));
+    assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler())
+        .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+    // Transition to standby will set RM's HA status and then reinitialize in
+    // a separate thread. Despite asserting for STANDBY state, it's
+    // possible for reinitialization to be unfinished. Wait here for it to
+    // finish, otherwise closing rm1 will close zkManager and the unfinished
+    // reinitialization will throw an exception.
+    Thread.sleep(10000);
+    rm1.close();
+    rm2.close();
+  }
+
   @Override
   public YarnConfigurationStore createConfStore() {
     return new ZKConfigurationStore();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message