hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: YARN-7003. DRAINING state of queues is not recovered after RM restart. Contributed by Tao Yang.
Date Wed, 16 May 2018 13:48:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 e4355b942 -> 67927322d


YARN-7003. DRAINING state of queues is not recovered after RM restart. Contributed by Tao
Yang.

(Cherry picked from commit 9db9cd95bd0348070a286e69e7965c03c9bd39d6)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67927322
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67927322
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67927322

Branch: refs/heads/branch-2.9
Commit: 67927322dfbef6e240ea62b857a9eebf4c8d552c
Parents: e4355b9
Author: Weiwei Yang <wwei@apache.org>
Authored: Fri May 11 10:47:04 2018 +0800
Committer: Weiwei Yang <wwei@apache.org>
Committed: Wed May 16 21:39:35 2018 +0800

----------------------------------------------------------------------
 .../scheduler/capacity/AbstractCSQueue.java     | 15 +++++
 .../scheduler/capacity/CapacityScheduler.java   |  7 +++
 .../scheduler/capacity/TestQueueState.java      | 60 ++++++++++++++++++++
 3 files changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67927322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e305085..8ba0200 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1046,4 +1046,19 @@ public abstract class AbstractCSQueue implements CSQueue {
   public Map<String, Float> getUserWeights() {
     return userWeights;
   }
+
+  public void recoverDrainingState() {
+    try {
+      this.writeLock.lock();
+      if (getState() == QueueState.STOPPED) {
+        updateQueueState(QueueState.DRAINING);
+      }
+      LOG.info("Recover draining state for queue " + this.getQueuePath());
+      if (getParent() != null && getParent().getState() == QueueState.STOPPED) {
+        ((AbstractCSQueue) getParent()).recoverDrainingState();
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67927322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ab60d8a..65f2cba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -714,6 +715,12 @@ public class CapacityScheduler extends
           throw new QueueInvalidException(queueErrorMsg);
         }
       }
+      // When recovering apps in this queue but queue is in STOPPED state,
+      // that means its previous state was DRAINING. So we auto transit
+      // the state to DRAINING for recovery.
+      if (queue.getState() == QueueState.STOPPED) {
+        ((LeafQueue) queue).recoverDrainingState();
+      }
       // Submit to the queue
       try {
         queue.submitApplication(applicationId, user, queueName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67927322/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index 9f2933e..0a39e99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -197,4 +202,59 @@ public class TestQueueState {
         .thenCallRealMethod();
     return application;
   }
+
+  @Test (timeout = 30000)
+  public void testRecoverDrainingStateAfterRMRestart() throws Exception {
+    // init conf
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration();
+    newConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    newConf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+        false);
+    newConf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    newConf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{Q1});
+    newConf.setQueues(Q1_PATH, new String[]{Q2});
+    newConf.setCapacity(Q1_PATH, 100);
+    newConf.setCapacity(Q2_PATH, 100);
+
+    // init state store
+    MemoryRMStateStore newMemStore = new MemoryRMStateStore();
+    newMemStore.init(newConf);
+    // init RM & NMs & Nodes
+    MockRM rm = new MockRM(newConf, newMemStore);
+    rm.start();
+    MockNM nm = rm.registerNode("h1:1234", 204800);
+
+    // submit an app, AM is running on nm1
+    RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2);
+    MockRM.launchAM(app, rm, nm);
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    // update queue state to STOPPED
+    newConf.setState(Q1_PATH, QueueState.STOPPED);
+    CapacityScheduler capacityScheduler =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    capacityScheduler.reinitialize(newConf, rm.getRMContext());
+    // current queue state should be DRAINING
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q1).getState());
+
+    // RM restart
+    rm = new MockRM(newConf, newMemStore);
+    rm.start();
+    rm.registerNode("h1:1234", 204800);
+
+    // queue state should be DRAINING after app recovered
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    capacityScheduler = (CapacityScheduler) rm.getRMContext().getScheduler();
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q1).getState());
+
+    // close rm
+    rm.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message