hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject hadoop git commit: YARN-5756. Add state-machine implementation for scheduler queues. (Xuan Gong via wangda)
Date Wed, 28 Dec 2016 05:18:39 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 0ddb8defa -> 0840b4329


YARN-5756. Add state-machine implementation for scheduler queues. (Xuan Gong via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0840b432
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0840b432
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0840b432

Branch: refs/heads/trunk
Commit: 0840b4329b2428b20b862f70d72cbdcd6d1618ed
Parents: 0ddb8de
Author: Wangda Tan <wangda@apache.org>
Authored: Tue Dec 27 21:18:24 2016 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Tue Dec 27 21:18:33 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/QueueState.java     |  11 +-
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../scheduler/QueueStateManager.java            | 100 ++++++++++++
 .../scheduler/SchedulerQueue.java               |  69 ++++++++
 .../scheduler/SchedulerQueueManager.java        |   3 +-
 .../scheduler/capacity/AbstractCSQueue.java     | 119 +++++++++++---
 .../scheduler/capacity/CSQueue.java             |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |   5 +
 .../capacity/CapacitySchedulerContext.java      |   2 +
 .../capacity/CapacitySchedulerQueueManager.java |  13 ++
 .../scheduler/capacity/LeafQueue.java           |  19 ++-
 .../scheduler/capacity/ParentQueue.java         |  27 +++-
 .../scheduler/capacity/TestLeafQueue.java       |  11 ++
 .../scheduler/capacity/TestQueueState.java      | 104 ++++++++++++
 .../capacity/TestQueueStateManager.java         | 162 +++++++++++++++++++
 15 files changed, 621 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
index 2bc0407..86fd8b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
@@ -29,6 +29,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
  * <ul>
  *   <li>{@link #RUNNING} - normal state.</li>
  *   <li>{@link #STOPPED} - not accepting new application submissions.</li>
+ *   <li>
+ *     {@link #DRAINING} - not accepting new application submissions
+ *     and waiting for applications finish.
+ *   </li>
  * </ul>
  * 
  * @see QueueInfo
@@ -41,7 +45,12 @@ public enum QueueState {
    * Stopped - Not accepting submissions of new applications.
    */
   STOPPED,
-  
+
+  /**
+   * Draining - Not accepting submissions of new applications,
+   * and waiting for applications finish.
+   */
+  DRAINING,
   /**
    * Running - normal operation.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5a70298..a8ba740 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -427,6 +427,7 @@ message YarnClusterMetricsProto {
 enum QueueStateProto {
   Q_STOPPED = 1;
   Q_RUNNING = 2;
+  Q_DRAINING = 3;
 }
 
 message QueueStatisticsProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
new file mode 100644
index 0000000..761817e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+
+/**
+ *
+ * QueueStateManager which can be used by Scheduler to manage the queue state.
+ *
+ */
+// TODO: The class will be used by YARN-5734-OrgQueue for
+// easy CapacityScheduler queue configuration management.
+@SuppressWarnings("rawtypes")
+@Private
+@Unstable
+public class QueueStateManager<T extends SchedulerQueue,
+    E extends ReservationSchedulerConfiguration> {
+
+  private static final Log LOG = LogFactory.getLog(QueueStateManager.class);
+
+  private SchedulerQueueManager<T, E> queueManager;
+
+  public synchronized void initialize(SchedulerQueueManager<T, E>
+      newQueueManager) {
+    this.queueManager = newQueueManager;
+  }
+
+  /**
+   * Stop the queue.
+   * @param queueName the queue name
+   * @throws YarnException if the queue does not exist
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized void stopQueue(String queueName) throws YarnException {
+    SchedulerQueue<T> queue = queueManager.getQueue(queueName);
+    if (queue == null) {
+      throw new YarnException("The specified queue:" + queueName
+          + " does not exist!");
+    }
+    queue.stopQueue();
+  }
+
+  /**
+   * Active the queue.
+   * @param queueName the queue name
+   * @throws YarnException if the queue does not exist
+   *         or the queue can not be activated.
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized void activateQueue(String queueName)
+      throws YarnException {
+    SchedulerQueue<T> queue = queueManager.getQueue(queueName);
+    if (queue == null) {
+      throw new YarnException("The specified queue:" + queueName
+          + " does not exist!");
+    }
+    queue.activeQueue();
+  }
+
+  /**
+   * Whether this queue can be deleted.
+   * @param queueName the queue name
+   * @return true if the queue can be deleted
+   */
+  @SuppressWarnings("unchecked")
+  public boolean canDelete(String queueName) {
+    SchedulerQueue<T> queue = queueManager.getQueue(queueName);
+    if (queue == null) {
+      LOG.info("The specified queue:" + queueName + " does not exist!");
+      return false;
+    }
+    if (queue.getState() == QueueState.STOPPED){
+      return true;
+    }
+    LOG.info("Need to stop the specific queue:" + queueName + " first.");
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
new file mode 100644
index 0000000..9a67e01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ *
+ * Represents a queue in Scheduler.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@LimitedPrivate("yarn")
+public interface SchedulerQueue<T extends SchedulerQueue> extends Queue {
+
+  /**
+   * Get list of child queues.
+   * @return a list of child queues
+   */
+  List<T> getChildQueues();
+
+  /**
+   * Get the parent queue.
+   * @return the parent queue
+   */
+  T getParent();
+
+  /**
+   * Get current queue state.
+   * @return the queue state
+   */
+  QueueState getState();
+
+  /**
+   * Update the queue state.
+   * @param state the queue state
+   */
+  void updateQueueState(QueueState state);
+
+  /**
+   * Stop the queue.
+   */
+  void stopQueue();
+
+  /**
+   * Active the queue.
+   * @throws YarnException if the queue can not be activated.
+   */
+  void activeQueue() throws YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
index 92b989a..24797a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
@@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSche
  * Context of the Queues in Scheduler.
  *
  */
+@SuppressWarnings("rawtypes")
 @Private
 @Unstable
-public interface SchedulerQueueManager<T extends Queue,
+public interface SchedulerQueueManager<T extends SchedulerQueue,
     E extends ReservationSchedulerConfiguration> {
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 3372392..d1fa410 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueStatistics;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessRequest;
@@ -77,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   final Resource minimumAllocation;
   volatile Resource maximumAllocation;
-  volatile QueueState state;
+  private volatile QueueState state = null;
   final CSQueueMetrics metrics;
   protected final PrivilegedEntity queueEntity;
 
@@ -292,9 +293,15 @@ public abstract class AbstractCSQueue implements CSQueue {
           csContext.getConfiguration().getMaximumAllocationPerQueue(
               getQueuePath());
 
-      authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
+      // initialized the queue state based on previous state, configured state
+      // and its parent state.
+      QueueState previous = getState();
+      QueueState configuredState = csContext.getConfiguration()
+          .getConfiguredState(getQueuePath());
+      QueueState parentState = (parent == null) ? null : parent.getState();
+      initializeQueueState(previous, configuredState, parentState);
 
-      initializeQueueState();
+      authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
 
       this.acls = csContext.getConfiguration().getAcls(getQueuePath());
 
@@ -334,26 +341,53 @@ public abstract class AbstractCSQueue implements CSQueue {
     }
   }
 
-  private void initializeQueueState() {
-    // inherit from parent if state not set, only do this when we are not root
-    if (parent != null) {
-      QueueState configuredState = csContext.getConfiguration()
-          .getConfiguredState(getQueuePath());
-      QueueState parentState = parent.getState();
-      if (configuredState == null) {
-        this.state = parentState;
-      } else if (configuredState == QueueState.RUNNING
-          && parentState == QueueState.STOPPED) {
-        throw new IllegalArgumentException(
-            "The parent queue:" + parent.getQueueName() + " state is STOPPED, "
-            + "child queue:" + queueName + " state cannot be RUNNING.");
+  private void initializeQueueState(QueueState previousState,
+      QueueState configuredState, QueueState parentState) {
+    // verify that we can not any value for State other than RUNNING/STOPPED
+    if (configuredState != null && configuredState != QueueState.RUNNING
+        && configuredState != QueueState.STOPPED) {
+      throw new IllegalArgumentException("Invalid queue state configuration."
+          + " We can only use RUNNING or STOPPED.");
+    }
+    // If we did not set state in configuration, use Running as default state
+    QueueState defaultState = QueueState.RUNNING;
+
+    if (previousState == null) {
+      // If current state of the queue is null, we would inherit the state
+      // from its parent. If this queue does not has parent, such as root queue,
+      // we would use the configured state.
+      if (parentState == null) {
+        updateQueueState((configuredState == null) ? defaultState
+            : configuredState);
       } else {
-        this.state = configuredState;
+        if (configuredState == null) {
+          updateQueueState((parentState == QueueState.DRAINING) ?
+              QueueState.STOPPED : parentState);
+        } else if (configuredState == QueueState.RUNNING
+            && parentState != QueueState.RUNNING) {
+          throw new IllegalArgumentException(
+              "The parent queue:" + parent.getQueueName()
+              + " state is STOPPED, child queue:" + queueName
+              + " state cannot be RUNNING.");
+        } else {
+          updateQueueState(configuredState);
+        }
       }
     } else {
-      // if this is the root queue, get the state from the configuration.
-      // if the state is not set, use RUNNING as default state.
-      this.state = csContext.getConfiguration().getState(getQueuePath());
+      // when we get a refreshQueue request from AdminService,
+      if (previousState == QueueState.RUNNING) {
+        if (configuredState == QueueState.STOPPED) {
+          stopQueue();
+        }
+      } else {
+        if (configuredState == QueueState.RUNNING) {
+          try {
+            activeQueue();
+          } catch (YarnException ex) {
+            throw new IllegalArgumentException(ex.getMessage());
+          }
+        }
+      }
     }
   }
 
@@ -367,7 +401,7 @@ public abstract class AbstractCSQueue implements CSQueue {
     queueInfo.setAccessibleNodeLabels(accessibleLabels);
     queueInfo.setCapacity(queueCapacities.getCapacity());
     queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
-    queueInfo.setQueueState(state);
+    queueInfo.setQueueState(getState());
     queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
     queueInfo.setCurrentCapacity(getUsedCapacity());
     queueInfo.setQueueStatistics(getQueueStatistics());
@@ -846,4 +880,47 @@ public abstract class AbstractCSQueue implements CSQueue {
       String userName, String queue) throws AccessControlException {
     // Dummy implementation
   }
+
+  @Override
+  public void updateQueueState(QueueState queueState) {
+    this.state = queueState;
+  }
+
+  @Override
+  public void activeQueue() throws YarnException {
+    try {
+      this.writeLock.lock();
+      if (getState() == QueueState.RUNNING) {
+        LOG.info("The specified queue:" + queueName
+            + " is already in the RUNNING state.");
+      } else if (getState() == QueueState.DRAINING) {
+        throw new YarnException(
+            "The queue:" + queueName + " is in the Stopping process. "
+            + "Please wait for the queue getting fully STOPPED.");
+      } else {
+        CSQueue parent = getParent();
+        if (parent == null || parent.getState() == QueueState.RUNNING) {
+          updateQueueState(QueueState.RUNNING);
+        } else {
+          throw new YarnException("The parent Queue:" + parent.getQueueName()
+              + " is not running. Please activate the parent queue first");
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  protected void appFinished() {
+    try {
+      this.writeLock.lock();
+      if (getState() == QueueState.DRAINING) {
+        if (getNumApplications() == 0) {
+          updateQueueState(QueueState.STOPPED);
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 550e206..e30ec39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -56,8 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleP
  */
 @Stable
 @Private
-public interface CSQueue 
-extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
+public interface CSQueue extends SchedulerQueue<CSQueue> {
   /**
    * Get the parent <code>Queue</code>.
    * @return the parent queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9a73a65..5463abd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2474,4 +2474,9 @@ public class CapacityScheduler extends
     }
     return 0;
   }
+
+  @Override
+  public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
+    return this.queueManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index c41a7bf..7d29619 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -83,4 +83,6 @@ public interface CapacitySchedulerContext {
   ResourceUsage getClusterResourceUsage();
 
   ActivitiesManager getActivitiesManager();
+
+  CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 7a6ce56..6a3c08a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.security.Permission;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
 
@@ -86,6 +87,9 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
   private CSQueue root;
   private final RMNodeLabelsManager labelManager;
 
+  private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
+      queueStateManager;
+
   /**
    * Construct the service.
    * @param conf the configuration
@@ -95,6 +99,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
       RMNodeLabelsManager labelManager) {
     this.authorizer = YarnAuthorizationProvider.getInstance(conf);
     this.labelManager = labelManager;
+    this.queueStateManager = new QueueStateManager<>();
   }
 
   @Override
@@ -142,6 +147,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
         CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
     setQueueAcls(authorizer, queues);
     labelManager.reinitializeQueueLabels(getQueueToLabels());
+    this.queueStateManager.initialize(this);
     LOG.info("Initialized root queue " + root);
   }
 
@@ -170,6 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
         clusterResource));
 
     labelManager.reinitializeQueueLabels(getQueueToLabels());
+    this.queueStateManager.initialize(this);
   }
 
   /**
@@ -358,4 +365,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     }
     return queueToLabels;
   }
+
+  @Private
+  public QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
+      getQueueStateManager() {
+    return this.queueStateManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 93c0693..18b38f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -274,7 +274,7 @@ public class LeafQueue extends AbstractCSQueue {
               + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
               + maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
               + "numContainers = " + numContainers
-              + " [= currentNumContainers ]" + "\n" + "state = " + state
+              + " [= currentNumContainers ]" + "\n" + "state = " + getState()
               + " [= configuredState ]" + "\n" + "acls = " + aclsString
               + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
               + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
@@ -880,6 +880,9 @@ public class LeafQueue extends AbstractCSQueue {
   public void finishApplication(ApplicationId application, String user) {
     // Inform the activeUsersManager
     activeUsersManager.deactivateApplication(user, application);
+
+    appFinished();
+
     // Inform the parent queue
     getParent().finishApplication(application, user);
   }
@@ -2428,4 +2431,18 @@ public class LeafQueue extends AbstractCSQueue {
       return clusterResource;
     }
   }
+
+  @Override
+  public void stopQueue() {
+    try {
+      writeLock.lock();
+      if (getNumApplications() > 0) {
+        updateQueueState(QueueState.DRAINING);
+      } else {
+        updateQueueState(QueueState.STOPPED);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 0ba4ede..946fca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -133,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue {
           + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
           + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
           + ", absoluteMaxCapacity=" + this.queueCapacities
-          .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+          .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
           + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
           + ", reservationsContinueLooking=" + reservationsContinueLooking);
     } finally {
@@ -369,7 +369,7 @@ public class ParentQueue extends AbstractCSQueue {
             "Cannot submit application " + "to non-leaf queue: " + queueName);
       }
 
-      if (state != QueueState.RUNNING) {
+      if (getState() != QueueState.RUNNING) {
         throw new AccessControlException("Queue " + getQueuePath()
             + " is STOPPED. Cannot accept submission of application: "
             + applicationId);
@@ -411,7 +411,9 @@ public class ParentQueue extends AbstractCSQueue {
   public void finishApplication(ApplicationId application, String user) {
 
     removeApplication(application, user);
-    
+
+    appFinished();
+
     // Inform the parent queue
     if (parent != null) {
       parent.finishApplication(application, user);
@@ -1049,4 +1051,23 @@ public class ParentQueue extends AbstractCSQueue {
       parent.apply(cluster, request);
     }
   }
+
+  @Override
+  public void stopQueue() {
+    try {
+      this.writeLock.lock();
+      if (getNumApplications() > 0) {
+        updateQueueState(QueueState.DRAINING);
+      } else {
+        updateQueueState(QueueState.STOPPED);
+      }
+      if (getChildQueues() != null) {
+        for(CSQueue child : getChildQueues()) {
+          child.stopQueue();
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index c0fa43d..e495328 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -953,6 +955,7 @@ public class TestLeafQueue {
 
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test
   public void testComputeUserLimitAndSetHeadroom() throws IOException {
     LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
@@ -974,6 +977,14 @@ public class TestLeafQueue {
     Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
+    CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
+        = mock(CapacitySchedulerQueueManager.class);
+    QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
+    when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
+        mockQueueStateManager);
+    when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
+        mockCapacitySchedulerQueueManager);
+
     //our test plan contains three cases
     //1. single user dominate the queue, we test the headroom
     //2. two users, but user_0 is assigned 100% of the queue resource,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index bd878b7..9f2933e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -18,10 +18,23 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,11 +45,14 @@ public class TestQueueState {
 
   private static final String Q1 = "q1";
   private static final String Q2 = "q2";
+  private static final String Q3 = "q3";
 
   private final static String Q1_PATH =
       CapacitySchedulerConfiguration.ROOT + "." + Q1;
   private final static String Q2_PATH =
       Q1_PATH + "." + Q2;
+  private final static String Q3_PATH =
+      Q1_PATH + "." + Q3;
   private CapacityScheduler cs;
   private YarnConfiguration conf;
 
@@ -93,4 +109,92 @@ public class TestQueueState {
           + "child queue:q2 state cannot be RUNNING."));
     }
   }
+
+  @Test(timeout = 15000)
+  public void testQueueStateTransit() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
+    csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
+
+    csConf.setCapacity(Q1_PATH, 100);
+    csConf.setCapacity(Q2_PATH, 50);
+    csConf.setCapacity(Q3_PATH, 50);
+
+    conf = new YarnConfiguration(csConf);
+    cs = new CapacityScheduler();
+
+    RMContext rmContext = TestUtils.getMockRMContext();
+    cs.setConf(conf);
+    cs.setRMContext(rmContext);
+    cs.init(conf);
+
+    //by default, the state of ALL queues should be RUNNING
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+    // submit an application to Q2
+    ApplicationId appId = ApplicationId.newInstance(
+            System.currentTimeMillis(), 1);
+    String userName = "testUser";
+    cs.getQueue(Q2).submitApplication(appId, userName, Q2);
+    FiCaSchedulerApp app = getMockApplication(appId, userName,
+        Resources.createResource(4, 0));
+    cs.getQueue(Q2).submitApplicationAttempt(app, userName);
+
+    // set Q2 state to stop and do reinitialize.
+    csConf.setState(Q2_PATH, QueueState.STOPPED);
+    conf = new YarnConfiguration(csConf);
+    cs.reinitialize(conf, rmContext);
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+    // set Q1 state to stop and do reinitialize.
+    csConf.setState(Q1_PATH, QueueState.STOPPED);
+    conf = new YarnConfiguration(csConf);
+    cs.reinitialize(conf, rmContext);
+    Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+    // Active Q3, should fail
+    csConf.setState(Q3_PATH, QueueState.RUNNING);
+    conf = new YarnConfiguration(csConf);
+    try {
+      cs.reinitialize(conf, rmContext);
+      Assert.fail("Should throw an Exception.");
+    } catch (Exception ex) {
+      // Do Nothing
+    }
+
+    // stop the app running in q2
+    cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
+    cs.getQueue(Q2).finishApplication(appId, userName);
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+    
+  }
+
+  private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
+      Resource amResource) {
+    FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    doReturn(applicationAttemptId.getApplicationId()).
+        when(application).getApplicationId();
+    doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
+    doReturn(user).when(application).getUser();
+    doReturn(amResource).when(application).getAMResource();
+    doReturn(Priority.newInstance(0)).when(application).getPriority();
+    doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
+        .getAppAMNodePartitionName();
+    doReturn(amResource).when(application).getAMResource(
+        CommonNodeLabelsManager.NO_LABEL);
+    when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
+        .thenCallRealMethod();
+    return application;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0840b432/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
new file mode 100644
index 0000000..7763dac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test QueueStateManager.
+ *
+ */
+public class TestQueueStateManager {
+  private static final String Q1 = "q1";
+  private static final String Q2 = "q2";
+  private static final String Q3 = "q3";
+
+  private final static String Q1_PATH =
+      CapacitySchedulerConfiguration.ROOT + "." + Q1;
+  private final static String Q2_PATH =
+      Q1_PATH + "." + Q2;
+  private final static String Q3_PATH =
+      Q1_PATH + "." + Q3;
+  private CapacityScheduler cs;
+  private YarnConfiguration conf;
+
+  @Test
+  public void testQueueStateManager() throws AccessControlException,
+      YarnException {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
+    csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
+
+    csConf.setCapacity(Q1_PATH, 100);
+    csConf.setCapacity(Q2_PATH, 50);
+    csConf.setCapacity(Q3_PATH, 50);
+
+    conf = new YarnConfiguration(csConf);
+    cs = new CapacityScheduler();
+
+    RMContext rmContext = TestUtils.getMockRMContext();
+    cs.setConf(conf);
+    cs.setRMContext(rmContext);
+    cs.init(conf);
+
+    @SuppressWarnings("rawtypes")
+    QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager()
+        .getQueueStateManager();
+
+    //by default, the state of both queues should be RUNNING
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+    // Stop Q2, and verify that Q2 transmits to STOPPED STATE
+    stateManager.stopQueue(Q2);
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+    // Stop Q1, and verify that Q1, as well as its child: Q3,
+    // transmits to STOPPED STATE
+    stateManager.stopQueue(Q1);
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+    Assert.assertTrue(stateManager.canDelete(Q1));
+    Assert.assertTrue(stateManager.canDelete(Q2));
+    Assert.assertTrue(stateManager.canDelete(Q3));
+
+    // Active Q2, it will fail.
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+    // Now active Q1
+    stateManager.activateQueue(Q1);
+    // Q1 should be in RUNNING state. Its children: Q2 and Q3
+    // should still be in STOPPED state.
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+    // Now active Q2 and Q3
+    stateManager.activateQueue(Q2);
+    stateManager.activateQueue(Q3);
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+    Assert.assertFalse(stateManager.canDelete(Q1));
+    Assert.assertFalse(stateManager.canDelete(Q2));
+    Assert.assertFalse(stateManager.canDelete(Q3));
+
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    String userName = "testUser";
+    cs.getQueue(Q2).submitApplication(appId, userName, Q2);
+    FiCaSchedulerApp app = getMockApplication(appId, userName,
+          Resources.createResource(4, 0));
+    cs.getQueue(Q2).submitApplicationAttempt(app, userName);
+    stateManager.stopQueue(Q1);
+
+    Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+    cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
+    cs.getQueue(Q2).finishApplication(appId, userName);
+
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+    Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+  }
+
+  private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
+      Resource amResource) {
+    FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    doReturn(applicationAttemptId.getApplicationId()).
+        when(application).getApplicationId();
+    doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
+    doReturn(user).when(application).getUser();
+    doReturn(amResource).when(application).getAMResource();
+    doReturn(Priority.newInstance(0)).when(application).getPriority();
+    doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
+        .getAppAMNodePartitionName();
+    doReturn(amResource).when(application).getAMResource(
+        CommonNodeLabelsManager.NO_LABEL);
+    when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
+        .thenCallRealMethod();
+    return application;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message