hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject hadoop git commit: YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9)
Date Wed, 30 Nov 2016 21:39:23 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 3fd844b99 -> 69fb70c31


YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69fb70c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69fb70c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69fb70c3

Branch: refs/heads/trunk
Commit: 69fb70c31aa277f7fb14b05c0185ddc5cd90793d
Parents: 3fd844b
Author: Li Lu <gtcarrera9@apache.org>
Authored: Wed Nov 30 13:38:42 2016 -0800
Committer: Li Lu <gtcarrera9@apache.org>
Committed: Wed Nov 30 13:38:42 2016 -0800

----------------------------------------------------------------------
 .../scheduler/SchedulerQueueManager.java        |  75 ++++
 .../scheduler/capacity/CapacityScheduler.java   | 294 +++------------
 .../capacity/CapacitySchedulerQueueManager.java | 361 +++++++++++++++++++
 .../capacity/TestApplicationLimits.java         |  35 +-
 .../TestApplicationLimitsByPartition.java       |   7 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   9 +-
 .../scheduler/capacity/TestLeafQueue.java       |   9 +-
 .../scheduler/capacity/TestParentQueue.java     |  39 +-
 .../scheduler/capacity/TestReservations.java    |   8 +-
 .../scheduler/capacity/TestUtils.java           |   2 +-
 10 files changed, 536 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
new file mode 100644
index 0000000..92b989a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
@@ -0,0 +1,75 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+
+/**
+ *
+ * Context of the Queues in Scheduler.
+ *
+ */
+@Private
+@Unstable
+public interface SchedulerQueueManager<T extends Queue,
+    E extends ReservationSchedulerConfiguration> {
+
+  /**
+   * Get the root queue.
+   * @return root queue
+   */
+  T getRootQueue();
+
+  /**
+   * Get all the queues.
+   * @return a map contains all the queues as well as related queue names
+   */
+  Map<String, T> getQueues();
+
+  /**
+   * Remove the queue from the existing queue.
+   * @param queueName the queue name
+   */
+  void removeQueue(String queueName);
+
+  /**
+   * Add a new queue to the existing queues.
+   * @param queueName the queue name
+   * @param queue the queue object
+   */
+  void addQueue(String queueName, T queue);
+
+  /**
+   * Get a queue matching the specified queue name.
+   * @param queueName the queue name
+   * @return a queue object
+   */
+  T getQueue(String queueName);
+
+  /**
+   * Reinitialize the queues.
+   * @param newConf the configuration
+   * @throws IOException if fails to re-initialize queues
+   */
+  void reinitializeQueues(E newConf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cc8b3b0..e42b20c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -68,8 +67,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
-import org.apache.hadoop.yarn.security.Permission;
-import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
@@ -156,9 +153,9 @@ public class CapacityScheduler extends
     ResourceAllocationCommitter {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
-  private YarnAuthorizationProvider authorizer;
 
-  private CSQueue root;
+  private CapacitySchedulerQueueManager queueManager;
+
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
@@ -168,22 +165,6 @@ public class CapacityScheduler extends
 
   private int offswitchPerHeartbeatLimit;
 
-  static final Comparator<CSQueue> nonPartitionedQueueComparator =
-      new Comparator<CSQueue>() {
-    @Override
-    public int compare(CSQueue q1, CSQueue q2) {
-      if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
-        return -1;
-      } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
-        return 1;
-      }
-
-      return q1.getQueuePath().compareTo(q2.getQueuePath());
-    }
-  };
-  
-  static final PartitionedQueueComparator partitionedQueueComparator =
-      new PartitionedQueueComparator();
 
   @Override
   public void setConf(Configuration conf) {
@@ -236,8 +217,6 @@ public class CapacityScheduler extends
   private CapacitySchedulerConfiguration conf;
   private Configuration yarnConf;
 
-  private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
-
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
@@ -261,11 +240,11 @@ public class CapacityScheduler extends
 
   @Override
   public QueueMetrics getRootQueueMetrics() {
-    return root.getMetrics();
+    return getRootQueue().getMetrics();
   }
 
   public CSQueue getRootQueue() {
-    return root;
+    return queueManager.getRootQueue();
   }
 
   @Override
@@ -290,12 +269,12 @@ public class CapacityScheduler extends
 
   @Override
   public Comparator<CSQueue> getNonPartitionedQueueComparator() {
-    return nonPartitionedQueueComparator;
+    return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
   }
 
   @Override
   public PartitionedQueueComparator getPartitionedQueueComparator() {
-    return partitionedQueueComparator;
+    return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
   }
 
   @Override
@@ -326,7 +305,10 @@ public class CapacityScheduler extends
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
       this.applications = new ConcurrentHashMap<>();
       this.labelManager = rmContext.getNodeLabelManager();
-      authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+      this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
+          this.labelManager);
+      this.queueManager.setCapacitySchedulerContext(this);
+
       this.activitiesManager = new ActivitiesManager(rmContext);
       activitiesManager.init(conf);
       initializeQueues(this.conf);
@@ -554,13 +536,6 @@ public class CapacityScheduler extends
     }
   }
 
-  static class QueueHook {
-    public CSQueue hook(CSQueue queue) {
-      return queue;
-    }
-  }
-  private static final QueueHook noop = new QueueHook();
-
   @VisibleForTesting
   public UserGroupMappingPlacementRule
       getUserGroupMappingPlacementRule() throws IOException {
@@ -578,7 +553,7 @@ public class CapacityScheduler extends
         if (!mappingQueue.equals(
             UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
             .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
-          CSQueue queue = queues.get(mappingQueue);
+          CSQueue queue = getQueue(mappingQueue);
           if (queue == null || !(queue instanceof LeafQueue)) {
             throw new IOException(
                 "mapping contains invalid or non-leaf queue " + mappingQueue);
@@ -616,184 +591,29 @@ public class CapacityScheduler extends
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
 
-    root =
-        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
-            queues, queues, noop);
-    labelManager.reinitializeQueueLabels(getQueueToLabels());
-    LOG.info("Initialized root queue " + root);
+    this.queueManager.initializeQueues(conf);
+
     updatePlacementRules();
-    setQueueAcls(authorizer, queues);
 
     // Notify Preemption Manager
-    preemptionManager.refreshQueues(null, root);
+    preemptionManager.refreshQueues(null, this.getRootQueue());
   }
 
   @Lock(CapacityScheduler.class)
   private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
   throws IOException {
-    // Parse new queues
-    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
-    CSQueue newRoot =
-        parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
-            newQueues, queues, noop);
-
-    // Ensure all existing queues are still present
-    validateExistingQueues(queues, newQueues);
-
-    // Add new queues
-    addNewQueues(queues, newQueues);
-
-    // Re-configure queues
-    root.reinitialize(newRoot, getClusterResource());
+    this.queueManager.reinitializeQueues(newConf);
     updatePlacementRules();
 
-    // Re-calculate headroom for active applications
-    Resource clusterResource = getClusterResource();
-    root.updateClusterResource(clusterResource, new ResourceLimits(
-        clusterResource));
-
-    labelManager.reinitializeQueueLabels(getQueueToLabels());
-    setQueueAcls(authorizer, queues);
-
     // Notify Preemption Manager
-    preemptionManager.refreshQueues(null, root);
-  }
-
-  @VisibleForTesting
-  public static void setQueueAcls(YarnAuthorizationProvider authorizer,
-      Map<String, CSQueue> queues) throws IOException {
-    List<Permission> permissions = new ArrayList<>();
-    for (CSQueue queue : queues.values()) {
-      AbstractCSQueue csQueue = (AbstractCSQueue) queue;
-      permissions.add(
-          new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
-    }
-    authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser());
-  }
-
-  private Map<String, Set<String>> getQueueToLabels() {
-    Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
-    for (CSQueue queue : queues.values()) {
-      queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
-    }
-    return queueToLabels;
-  }
-
-  /**
-   * Ensure all existing queues are present. Queues cannot be deleted
-   * @param queues existing queues
-   * @param newQueues new queues
-   */
-  @Lock(CapacityScheduler.class)
-  private void validateExistingQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
-  throws IOException {
-    // check that all static queues are included in the newQueues list
-    for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
-      if (!(e.getValue() instanceof ReservationQueue)) {
-        String queueName = e.getKey();
-        CSQueue oldQueue = e.getValue();
-        CSQueue newQueue = newQueues.get(queueName);
-        if (null == newQueue) {
-          throw new IOException(queueName + " cannot be found during refresh!");
-        } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
-          throw new IOException(queueName + " is moved from:"
-              + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
-              + " after refresh, which is not allowed.");
-        }
-      }
-    }
-  }
-
-  /**
-   * Add the new queues (only) to our list of queues...
-   * ... be careful, do not overwrite existing queues.
-   * @param queues
-   * @param newQueues
-   */
-  @Lock(CapacityScheduler.class)
-  private void addNewQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
-  {
-    for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
-      String queueName = e.getKey();
-      CSQueue queue = e.getValue();
-      if (!queues.containsKey(queueName)) {
-        queues.put(queueName, queue);
-      }
-    }
-  }
-
-  @Lock(CapacityScheduler.class)
-  static CSQueue parseQueue(
-      CapacitySchedulerContext csContext,
-      CapacitySchedulerConfiguration conf,
-      CSQueue parent, String queueName, Map<String, CSQueue> queues,
-      Map<String, CSQueue> oldQueues,
-      QueueHook hook) throws IOException {
-    CSQueue queue;
-    String fullQueueName =
-        (parent == null) ? queueName
-            : (parent.getQueuePath() + "." + queueName);
-    String[] childQueueNames =
-      conf.getQueues(fullQueueName);
-    boolean isReservableQueue = conf.isReservable(fullQueueName);
-    if (childQueueNames == null || childQueueNames.length == 0) {
-      if (null == parent) {
-        throw new IllegalStateException(
-            "Queue configuration missing child queue names for " + queueName);
-      }
-      // Check if the queue will be dynamically managed by the Reservation
-      // system
-      if (isReservableQueue) {
-        queue =
-            new PlanQueue(csContext, queueName, parent,
-                oldQueues.get(queueName));
-      } else {
-        queue =
-            new LeafQueue(csContext, queueName, parent,
-                oldQueues.get(queueName));
-
-        // Used only for unit tests
-        queue = hook.hook(queue);
-      }
-    } else {
-      if (isReservableQueue) {
-        throw new IllegalStateException(
-            "Only Leaf Queues can be reservable for " + queueName);
-      }
-      ParentQueue parentQueue =
-        new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
-
-      // Used only for unit tests
-      queue = hook.hook(parentQueue);
-
-      List<CSQueue> childQueues = new ArrayList<CSQueue>();
-      for (String childQueueName : childQueueNames) {
-        CSQueue childQueue =
-          parseQueue(csContext, conf, queue, childQueueName,
-              queues, oldQueues, hook);
-        childQueues.add(childQueue);
-      }
-      parentQueue.setChildQueues(childQueues);
-    }
-
-    if (queue instanceof LeafQueue && queues.containsKey(queueName)
-        && queues.get(queueName) instanceof LeafQueue) {
-      throw new IOException("Two leaf queues were named " + queueName
-          + ". Leaf queue names must be distinct");
-    }
-    queues.put(queueName, queue);
-
-    LOG.info("Initialized queue: " + queue);
-    return queue;
+    preemptionManager.refreshQueues(null, this.getRootQueue());
   }
 
   public CSQueue getQueue(String queueName) {
     if (queueName == null) {
       return null;
     }
-    return queues.get(queueName);
+    return this.queueManager.getQueue(queueName);
   }
 
   private void addApplicationOnRecovery(
@@ -1047,7 +867,7 @@ public class CapacityScheduler extends
 
       // Inform the queue
       String queueName = attempt.getQueue().getQueueName();
-      CSQueue queue = queues.get(queueName);
+      CSQueue queue = this.getQueue(queueName);
       if (!(queue instanceof LeafQueue)) {
         LOG.error(
             "Cannot finish application " + "from non-leaf queue: " + queueName);
@@ -1174,7 +994,7 @@ public class CapacityScheduler extends
       boolean includeChildQueues, boolean recursive)
   throws IOException {
     CSQueue queue = null;
-    queue = this.queues.get(queueName);
+    queue = this.getQueue(queueName);
     if (queue == null) {
       throw new IOException("Unknown queue: " + queueName);
     }
@@ -1192,7 +1012,7 @@ public class CapacityScheduler extends
       return new ArrayList<QueueUserACLInfo>();
     }
 
-    return root.getQueueUserAclInfo(user);
+    return getRootQueue().getQueueUserAclInfo(user);
   }
 
   @Override
@@ -1235,7 +1055,7 @@ public class CapacityScheduler extends
       writeLock.lock();
       updateNodeResource(nm, resourceOption);
       Resource clusterResource = getClusterResource();
-      root.updateClusterResource(clusterResource,
+      getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));
     } finally {
       writeLock.unlock();
@@ -1471,8 +1291,8 @@ public class CapacityScheduler extends
 
   private CSAssignment allocateOrReserveNewContainers(
       PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
-    CSAssignment assignment = root.assignContainers(getClusterResource(), ps,
-        new ResourceLimits(labelManager
+    CSAssignment assignment = getRootQueue().assignContainers(
+        getClusterResource(), ps, new ResourceLimits(labelManager
             .getResourceByLabel(ps.getPartition(), getClusterResource())),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
@@ -1506,7 +1326,7 @@ public class CapacityScheduler extends
     }
 
     // Try to use NON_EXCLUSIVE
-    assignment = root.assignContainers(getClusterResource(), ps,
+    assignment = getRootQueue().assignContainers(getClusterResource(), ps,
         // TODO, now we only consider limits for parent for non-labeled
         // resources, should consider labeled resources as well.
         new ResourceLimits(labelManager
@@ -1526,8 +1346,8 @@ public class CapacityScheduler extends
       PlacementSet<FiCaSchedulerNode> ps) {
     // When this time look at multiple nodes, try schedule if the
     // partition has any available resource or killable resource
-    if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f
-        && preemptionManager.getKillableResource(
+    if (getRootQueue().getQueueCapacities().getUsedCapacity(
+        ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
         CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
         .none()) {
       if (LOG.isDebugEnabled()) {
@@ -1710,7 +1530,7 @@ public class CapacityScheduler extends
         updateLabelsOnNode(id, labels);
       }
       Resource clusterResource = getClusterResource();
-      root.updateClusterResource(clusterResource,
+      getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));
     } finally {
       writeLock.unlock();
@@ -1731,7 +1551,7 @@ public class CapacityScheduler extends
       }
 
       Resource clusterResource = getClusterResource();
-      root.updateClusterResource(clusterResource,
+      getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));
 
       LOG.info(
@@ -1782,7 +1602,7 @@ public class CapacityScheduler extends
 
       nodeTracker.removeNode(nodeId);
       Resource clusterResource = getClusterResource();
-      root.updateClusterResource(clusterResource,
+      getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));
       int numNodes = nodeTracker.nodeCount();
 
@@ -2020,7 +1840,7 @@ public class CapacityScheduler extends
 
   @Override
   public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
-    CSQueue queue = queues.get(queueName);
+    CSQueue queue = getQueue(queueName);
     if (queue == null) {
       return null;
     }
@@ -2030,7 +1850,8 @@ public class CapacityScheduler extends
   }
 
   public boolean isSystemAppsLimitReached() {
-    if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
+    if (getRootQueue().getNumApplications() < conf
+        .getMaximumSystemApplications()) {
       return false;
     }
     return true;
@@ -2131,7 +1952,7 @@ public class CapacityScheduler extends
       }
 
       ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
-      this.queues.remove(queueName);
+      this.queueManager.removeQueue(queueName);
       LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
     } finally {
       writeLock.unlock();
@@ -2160,7 +1981,7 @@ public class CapacityScheduler extends
       PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
       String queuename = newQueue.getQueueName();
       parentPlan.addChildQueue(newQueue);
-      this.queues.put(queuename, newQueue);
+      this.queueManager.addQueue(queuename, newQueue);
       LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
     } finally {
       writeLock.unlock();
@@ -2172,7 +1993,7 @@ public class CapacityScheduler extends
       throws YarnException {
     try {
       writeLock.lock();
-      LeafQueue queue = getAndCheckLeafQueue(inQueue);
+      LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
       ParentQueue parent = (ParentQueue) queue.getParent();
 
       if (!(queue instanceof ReservationQueue)) {
@@ -2224,9 +2045,10 @@ public class CapacityScheduler extends
       FiCaSchedulerApp app = getApplicationAttempt(
           ApplicationAttemptId.newInstance(appId, 0));
       String sourceQueueName = app.getQueue().getQueueName();
-      LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+      LeafQueue source = this.queueManager.getAndCheckLeafQueue(
+          sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(targetQueueName);
-      LeafQueue dest = getAndCheckLeafQueue(destQueueName);
+      LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
       // Validation check - ACLs, submission limits for user & queue
       String user = app.getUser();
       checkQueuePartition(app, dest);
@@ -2290,27 +2112,6 @@ public class CapacityScheduler extends
     }
   }
 
-  /**
-   * Check that the String provided in input is the name of an existing,
-   * LeafQueue, if successful returns the queue.
-   *
-   * @param queue
-   * @return the LeafQueue
-   * @throws YarnException
-   */
-  private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
-    CSQueue ret = this.getQueue(queue);
-    if (ret == null) {
-      throw new YarnException("The specified Queue: " + queue
-          + " doesn't exist");
-    }
-    if (!(ret instanceof LeafQueue)) {
-      throw new YarnException("The specified Queue: " + queue
-          + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
-    }
-    return (LeafQueue) ret;
-  }
-
   /** {@inheritDoc} */
   @Override
   public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
@@ -2347,7 +2148,7 @@ public class CapacityScheduler extends
   @Override
   public Set<String> getPlanQueues() {
     Set<String> ret = new HashSet<String>();
-    for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
+    for (Map.Entry<String, CSQueue> l : queueManager.getQueues().entrySet()) {
       if (l.getValue() instanceof PlanQueue) {
         ret.add(l.getKey());
       }
@@ -2367,7 +2168,8 @@ public class CapacityScheduler extends
     if (null == priorityFromContext) {
       // Get the default priority for the Queue. If Queue is non-existent, then
       // use default priority
-      priorityFromContext = getDefaultPriorityForQueue(queueName);
+      priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
+          queueName);
 
       LOG.info("Application '" + applicationId
           + "' is submitted without priority "
@@ -2391,18 +2193,6 @@ public class CapacityScheduler extends
     return appPriority;
   }
 
-  private Priority getDefaultPriorityForQueue(String queueName) {
-    Queue queue = getQueue(queueName);
-    if (null == queue || null == queue.getDefaultApplicationPriority()) {
-      // Return with default application priority
-      return Priority.newInstance(CapacitySchedulerConfiguration
-          .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
-    }
-
-    return Priority.newInstance(queue.getDefaultApplicationPriority()
-        .getPriority());
-  }
-
   @Override
   public Priority updateApplicationPriority(Priority newPriority,
       ApplicationId applicationId, SettableFuture<Object> future)
@@ -2456,7 +2246,7 @@ public class CapacityScheduler extends
 
   @Override
   public ResourceUsage getClusterResourceUsage() {
-    return root.getQueueResourceUsage();
+    return getRootQueue().getQueueResourceUsage();
   }
 
   private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> getSchedulerContainer(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
new file mode 100644
index 0000000..7a6ce56
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -0,0 +1,361 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.Permission;
+import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
+
+/**
+ *
+ * Context of the Queues in Capacity Scheduler.
+ *
+ */
+@Private
+@Unstable
+public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
+    CSQueue, CapacitySchedulerConfiguration>{
+
+  private static final Log LOG = LogFactory.getLog(
+      CapacitySchedulerQueueManager.class);
+
+  static final Comparator<CSQueue> NON_PARTITIONED_QUEUE_COMPARATOR =
+      new Comparator<CSQueue>() {
+    @Override
+    public int compare(CSQueue q1, CSQueue q2) {
+      if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
+        return -1;
+      } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
+        return 1;
+      }
+
+      return q1.getQueuePath().compareTo(q2.getQueuePath());
+    }
+  };
+
+  static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
+      new PartitionedQueueComparator();
+
+  static class QueueHook {
+    public CSQueue hook(CSQueue queue) {
+      return queue;
+    }
+  }
+
+  private static final QueueHook NOOP = new QueueHook();
+  private CapacitySchedulerContext csContext;
+  private final YarnAuthorizationProvider authorizer;
+  private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
+  private CSQueue root;
+  private final RMNodeLabelsManager labelManager;
+
+  /**
+   * Construct the service.
+   * @param conf the configuration
+   * @param labelManager the labelManager
+   */
+  public CapacitySchedulerQueueManager(Configuration conf,
+      RMNodeLabelsManager labelManager) {
+    this.authorizer = YarnAuthorizationProvider.getInstance(conf);
+    this.labelManager = labelManager;
+  }
+
+  @Override
+  public CSQueue getRootQueue() {
+    return this.root;
+  }
+
+  @Override
+  public Map<String, CSQueue> getQueues() {
+    return queues;
+  }
+
+  @Override
+  public void removeQueue(String queueName) {
+    this.queues.remove(queueName);
+  }
+
+  @Override
+  public void addQueue(String queueName, CSQueue queue) {
+    this.queues.put(queueName, queue);
+  }
+
+  @Override
+  public CSQueue getQueue(String queueName) {
+    return queues.get(queueName);
+  }
+
+  /**
+   * Set the CapacitySchedulerContext.
+   * @param capacitySchedulerContext the CapacitySchedulerContext
+   */
+  public void setCapacitySchedulerContext(
+      CapacitySchedulerContext capacitySchedulerContext) {
+    this.csContext = capacitySchedulerContext;
+  }
+
+  /**
+   * Initialized the queues.
+   * @param conf the CapacitySchedulerConfiguration
+   * @throws IOException if fails to initialize queues
+   */
+  public void initializeQueues(CapacitySchedulerConfiguration conf)
+      throws IOException {
+    root = parseQueue(this.csContext, conf, null,
+        CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
+    setQueueAcls(authorizer, queues);
+    labelManager.reinitializeQueueLabels(getQueueToLabels());
+    LOG.info("Initialized root queue " + root);
+  }
+
+  @Override
+  public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
+      throws IOException {
+    // Parse new queues
+    Map<String, CSQueue> newQueues = new HashMap<>();
+    CSQueue newRoot =  parseQueue(this.csContext, newConf, null,
+        CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
+
+    // Ensure all existing queues are still present
+    validateExistingQueues(queues, newQueues);
+
+    // Add new queues
+    addNewQueues(queues, newQueues);
+
+    // Re-configure queues
+    root.reinitialize(newRoot, this.csContext.getClusterResource());
+
+    setQueueAcls(authorizer, queues);
+
+    // Re-calculate headroom for active applications
+    Resource clusterResource = this.csContext.getClusterResource();
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
+
+    labelManager.reinitializeQueueLabels(getQueueToLabels());
+  }
+
+  /**
+   * Parse the queue from the configuration.
+   * @param csContext the CapacitySchedulerContext
+   * @param conf the CapacitySchedulerConfiguration
+   * @param parent the parent queue
+   * @param queueName the queue name
+   * @param queues all the queues
+   * @param oldQueues the old queues
+   * @param hook the queue hook
+   * @return the CSQueue
+   * @throws IOException
+   */
+  static CSQueue parseQueue(
+      CapacitySchedulerContext csContext,
+      CapacitySchedulerConfiguration conf,
+      CSQueue parent, String queueName, Map<String, CSQueue> queues,
+      Map<String, CSQueue> oldQueues,
+      QueueHook hook) throws IOException {
+    CSQueue queue;
+    String fullQueueName =
+        (parent == null) ? queueName
+            : (parent.getQueuePath() + "." + queueName);
+    String[] childQueueNames = conf.getQueues(fullQueueName);
+    boolean isReservableQueue = conf.isReservable(fullQueueName);
+    if (childQueueNames == null || childQueueNames.length == 0) {
+      if (null == parent) {
+        throw new IllegalStateException(
+            "Queue configuration missing child queue names for " + queueName);
+      }
+      // Check if the queue will be dynamically managed by the Reservation
+      // system
+      if (isReservableQueue) {
+        queue =
+            new PlanQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+      } else {
+        queue =
+            new LeafQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+
+        // Used only for unit tests
+        queue = hook.hook(queue);
+      }
+    } else {
+      if (isReservableQueue) {
+        throw new IllegalStateException(
+            "Only Leaf Queues can be reservable for " + queueName);
+      }
+      ParentQueue parentQueue =
+          new ParentQueue(csContext, queueName, parent,
+              oldQueues.get(queueName));
+
+      // Used only for unit tests
+      queue = hook.hook(parentQueue);
+
+      List<CSQueue> childQueues = new ArrayList<>();
+      for (String childQueueName : childQueueNames) {
+        CSQueue childQueue =
+            parseQueue(csContext, conf, queue, childQueueName,
+              queues, oldQueues, hook);
+        childQueues.add(childQueue);
+      }
+      parentQueue.setChildQueues(childQueues);
+    }
+
+    if (queue instanceof LeafQueue && queues.containsKey(queueName)
+        && queues.get(queueName) instanceof LeafQueue) {
+      throw new IOException("Two leaf queues were named " + queueName
+          + ". Leaf queue names must be distinct");
+    }
+    queues.put(queueName, queue);
+
+    LOG.info("Initialized queue: " + queue);
+    return queue;
+  }
+
+  /**
+   * Ensure all existing queues are present. Queues cannot be deleted
+   * @param queues existing queues
+   * @param newQueues new queues
+   */
+  private void validateExistingQueues(
+      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
+      throws IOException {
+    // check that all static queues are included in the newQueues list
+    for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
+      if (!(e.getValue() instanceof ReservationQueue)) {
+        String queueName = e.getKey();
+        CSQueue oldQueue = e.getValue();
+        CSQueue newQueue = newQueues.get(queueName);
+        if (null == newQueue) {
+          throw new IOException(queueName + " cannot be found during refresh!");
+        } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
+          throw new IOException(queueName + " is moved from:"
+              + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+              + " after refresh, which is not allowed.");
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the new queues (only) to our list of queues...
+   * ... be careful, do not overwrite existing queues.
+   * @param queues the existing queues
+   * @param newQueues the new queues
+   */
+  private void addNewQueues(
+      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
+    for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
+      String queueName = e.getKey();
+      CSQueue queue = e.getValue();
+      if (!queues.containsKey(queueName)) {
+        queues.put(queueName, queue);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  /**
+   * Set the acls for the queues.
+   * @param authorizer the yarnAuthorizationProvider
+   * @param queues the queues
+   * @throws IOException if fails to set queue acls
+   */
+  public static void setQueueAcls(YarnAuthorizationProvider authorizer,
+      Map<String, CSQueue> queues) throws IOException {
+    List<Permission> permissions = new ArrayList<>();
+    for (CSQueue queue : queues.values()) {
+      AbstractCSQueue csQueue = (AbstractCSQueue) queue;
+      permissions.add(
+          new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
+    }
+    authorizer.setPermission(permissions,
+        UserGroupInformation.getCurrentUser());
+  }
+
+  /**
+   * Check that the String provided in input is the name of an existing,
+   * LeafQueue, if successful returns the queue.
+   *
+   * @param queue the queue name
+   * @return the LeafQueue
+   * @throws YarnException if the queue does not exist or the queue
+   *           is not the type of LeafQueue.
+   */
+  public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+    CSQueue ret = this.getQueue(queue);
+    if (ret == null) {
+      throw new YarnException("The specified Queue: " + queue
+          + " doesn't exist");
+    }
+    if (!(ret instanceof LeafQueue)) {
+      throw new YarnException("The specified Queue: " + queue
+          + " is not a Leaf Queue.");
+    }
+    return (LeafQueue) ret;
+  }
+
+  /**
+   * Get the default priority of the queue.
+   * @param queueName the queue name
+   * @return the default priority of the queue
+   */
+  public Priority getDefaultPriorityForQueue(String queueName) {
+    Queue queue = getQueue(queueName);
+    if (null == queue || null == queue.getDefaultApplicationPriority()) {
+      // Return with default application priority
+      return Priority.newInstance(CapacitySchedulerConfiguration
+          .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+    }
+    return Priority.newInstance(queue.getDefaultApplicationPriority()
+        .getPriority());
+  }
+
+  /**
+   * Get a map of queueToLabels.
+   * @return the map of queueToLabels
+   */
+  private Map<String, Set<String>> getQueueToLabels() {
+    Map<String, Set<String>> queueToLabels = new HashMap<>();
+    for (CSQueue queue :  getQueues().values()) {
+      queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
+    }
+    return queueToLabels;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 11e94b9..7382f3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -111,7 +111,8 @@ public class TestApplicationLimits {
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
@@ -123,9 +124,9 @@ public class TestApplicationLimits {
         containerTokenSecretManager);
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, 
+    CSQueue root = CapacitySchedulerQueueManager
+        .parseQueue(csContext, csConf, null, "root",
+            queues, queues,
             TestUtils.spyHook);
 
     
@@ -276,7 +277,8 @@ public class TestApplicationLimits {
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 16));
     when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -288,8 +290,8 @@ public class TestApplicationLimits {
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, TestUtils.spyHook);
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+            "root", queues, queues, TestUtils.spyHook);
 
     LeafQueue queue = (LeafQueue)queues.get(A);
     
@@ -356,9 +358,9 @@ public class TestApplicationLimits {
         + ".maximum-am-resource-percent", 0.5f);
     // Re-create queues to get new configs.
     queues = new HashMap<String, CSQueue>();
-    root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, TestUtils.spyHook);
+    root = CapacitySchedulerQueueManager.parseQueue(
+        csContext, csConf, null, "root",
+        queues, queues, TestUtils.spyHook);
     clusterResource = Resources.createResource(100 * 16 * GB);
 
     queue = (LeafQueue)queues.get(A);
@@ -378,9 +380,9 @@ public class TestApplicationLimits {
         9999);
     // Re-create queues to get new configs.
     queues = new HashMap<String, CSQueue>();
-    root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, TestUtils.spyHook);
+    root = CapacitySchedulerQueueManager.parseQueue(
+        csContext, csConf, null, "root",
+        queues, queues, TestUtils.spyHook);
 
     queue = (LeafQueue)queues.get(A);
     assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@@ -580,7 +582,8 @@ public class TestApplicationLimits {
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB));
     when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     
@@ -589,8 +592,8 @@ public class TestApplicationLimits {
     when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
-        "root", queues, queues, TestUtils.spyHook);
+    CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
+        csConf, null, "root", queues, queues, TestUtils.spyHook);
 
     ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
     when(csContext.getClusterResourceUsage())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index d335552..5c53fda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -595,7 +595,8 @@ public class TestApplicationLimitsByPartition {
     when(csContext.getMaximumResourceCapability())
         .thenReturn(Resources.createResource(16 * GB));
     when(csContext.getNonPartitionedQueueComparator())
-        .thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        .thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     RMContext rmContext = TestUtils.getMockRMContext();
     RMContext spyRMContext = spy(rmContext);
@@ -614,8 +615,8 @@ public class TestApplicationLimitsByPartition {
     when(csContext.getClusterResource()).thenReturn(clusterResource);
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
-    CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
-        "root", queues, queues, TestUtils.spyHook);
+    CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
+        csConf, null, "root", queues, queues, TestUtils.spyHook);
 
     ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
     when(csContext.getClusterResourceUsage())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 40e5d2a..a6ae0c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -95,11 +95,12 @@ public class TestChildQueueOrder {
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
-    thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
+        thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
-    thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).
-    thenReturn(resourceComparator);
+        thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
   }
@@ -222,7 +223,7 @@ public class TestChildQueueOrder {
     setupSortedQueues(csConf);
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
-      CapacityScheduler.parseQueue(csContext, csConf, null, 
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
           CapacitySchedulerConfiguration.ROOT, queues, queues, 
           TestUtils.spyHook);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 8694efb..2ce5fcb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -175,7 +175,8 @@ public class TestLeafQueue {
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -188,7 +189,7 @@ public class TestLeafQueue {
         containerTokenSecretManager);
 
     root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT, 
             queues, queues, 
             TestUtils.spyHook);
@@ -2380,7 +2381,7 @@ public class TestLeafQueue {
         .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot =
-        CapacityScheduler.parseQueue(csContext, csConf, null,
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT,
             newQueues, queues,
             TestUtils.spyHook);
@@ -2405,7 +2406,7 @@ public class TestLeafQueue {
         .NODE_LOCALITY_DELAY, 60);
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot =
-        CapacityScheduler.parseQueue(csContext, csConf, null,
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT,
             newQueues, queues,
             TestUtils.spyHook);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index d875969..a36db44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -97,10 +97,11 @@ public class TestParentQueue {
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getNonPartitionedQueueComparator()).
-    thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+        thenReturn(
+            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getResourceCalculator()).
-    thenReturn(resourceComparator);
+        thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);
   }
   
@@ -231,7 +232,7 @@ public class TestParentQueue {
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root =
-        CapacityScheduler.parseQueue(csContext, csConf, null,
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT, queues, queues, 
             TestUtils.spyHook);
 
@@ -346,7 +347,7 @@ public class TestParentQueue {
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     boolean exceptionOccured = false;
     try {
-      CapacityScheduler.parseQueue(csContext, csConf, null,
+      CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
           CapacitySchedulerConfiguration.ROOT, queues, queues,
           TestUtils.spyHook);
     } catch (IllegalArgumentException ie) {
@@ -360,7 +361,7 @@ public class TestParentQueue {
     exceptionOccured = false;
     queues.clear();
     try {
-      CapacityScheduler.parseQueue(csContext, csConf, null,
+      CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
           CapacitySchedulerConfiguration.ROOT, queues, queues,
           TestUtils.spyHook);
     } catch (IllegalArgumentException ie) {
@@ -374,7 +375,7 @@ public class TestParentQueue {
     exceptionOccured = false;
     queues.clear();
     try {
-      CapacityScheduler.parseQueue(csContext, csConf, null,
+      CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
           CapacitySchedulerConfiguration.ROOT, queues, queues,
           TestUtils.spyHook);
     } catch (IllegalArgumentException ie) {
@@ -467,7 +468,7 @@ public class TestParentQueue {
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root =
-        CapacityScheduler.parseQueue(csContext, csConf, null,
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
     
@@ -623,8 +624,8 @@ public class TestParentQueue {
     csConf.setCapacity(Q_B + "." + B3, 0);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); 
-    CapacityScheduler.parseQueue(csContext, csConf, null, 
-        CapacitySchedulerConfiguration.ROOT, queues, queues, 
+    CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+        CapacitySchedulerConfiguration.ROOT, queues, queues,
         TestUtils.spyHook);
   }
   
@@ -640,8 +641,8 @@ public class TestParentQueue {
     csConf.setCapacity(Q_A, 60);
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); 
-    CapacityScheduler.parseQueue(csContext, csConf, null, 
-        CapacitySchedulerConfiguration.ROOT, queues, queues, 
+    CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+        CapacitySchedulerConfiguration.ROOT, queues, queues,
         TestUtils.spyHook);
   }
   
@@ -662,8 +663,8 @@ public class TestParentQueue {
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); 
     try {
-      CapacityScheduler.parseQueue(csContext, csConf, null, 
-          CapacitySchedulerConfiguration.ROOT, queues, queues, 
+      CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+          CapacitySchedulerConfiguration.ROOT, queues, queues,
           TestUtils.spyHook);
     } catch (IllegalArgumentException e) {
       fail("Failed to create queues with 0 capacity: " + e);
@@ -678,7 +679,7 @@ public class TestParentQueue {
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root =
-        CapacityScheduler.parseQueue(csContext, csConf, null,
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
             CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
 
@@ -754,8 +755,8 @@ public class TestParentQueue {
     //B3
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
-            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
 
     // Setup some nodes
@@ -850,12 +851,12 @@ public class TestParentQueue {
 
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, 
-            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT, queues, queues,
             TestUtils.spyHook);
     YarnAuthorizationProvider authorizer =
         YarnAuthorizationProvider.getInstance(conf);
-    CapacityScheduler.setQueueAcls(authorizer, queues);
+    CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues);
 
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
     // Setup queue configs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index f6caa50..3e05456 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -134,7 +134,7 @@ public class TestReservations {
     when(csContext.getClusterResource()).thenReturn(
         Resources.createResource(100 * 16 * GB, 100 * 12));
     when(csContext.getNonPartitionedQueueComparator()).thenReturn(
-        CapacityScheduler.nonPartitionedQueueComparator);
+        CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
     when(csContext.getRMContext()).thenReturn(rmContext);
@@ -144,7 +144,7 @@ public class TestReservations {
     when(csContext.getContainerTokenSecretManager()).thenReturn(
         containerTokenSecretManager);
 
-    root = CapacityScheduler.parseQueue(csContext, csConf, null,
+    root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
         CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
 
     ResourceUsage queueResUsage = root.getQueueResourceUsage();
@@ -1180,8 +1180,8 @@ public class TestReservations {
     csConf.setBoolean(
         CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
-    CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null,
-        CapacitySchedulerConfiguration.ROOT, newQueues, queues,
+    CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
+        csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
         TestUtils.spyHook);
     queues = newQueues;
     root.reinitialize(newRoot, cs.getClusterResource());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e34ee34..b982fab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -141,7 +141,7 @@ public class TestUtils {
   /**
    * Hook to spy on queues.
    */
-  static class SpyHook extends CapacityScheduler.QueueHook {
+  static class SpyHook extends CapacitySchedulerQueueManager.QueueHook {
     @Override
     public CSQueue hook(CSQueue queue) {
       return spy(queue);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message