hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cur...@apache.org
Subject [39/50] [abbrv] git commit: YARN-1707. Introduce APIs to add/remove/resize queues in the CapacityScheduler. Contributed by Carlo Curino and Subru Krishnan
Date Mon, 22 Sep 2014 23:48:28 GMT
YARN-1707. Introduce APIs to add/remove/resize queues in the CapacityScheduler. Contributed by Carlo Curino and Subru Krishnan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/60d4f29d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/60d4f29d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/60d4f29d

Branch: refs/heads/YARN-1051
Commit: 60d4f29d96c39a7be0e7f0c2a708ad4c8519be89
Parents: 43efdd3
Author: carlo curino <Carlo Curino>
Authored: Fri Sep 12 16:43:38 2014 -0700
Committer: carlo curino <Carlo Curino>
Committed: Mon Sep 22 16:28:48 2014 -0700

----------------------------------------------------------------------
 .../scheduler/AbstractYarnScheduler.java        |  25 ++
 .../SchedulerDynamicEditException.java          |  13 +
 .../scheduler/YarnScheduler.java                |  42 +++
 .../scheduler/capacity/CapacityScheduler.java   | 209 ++++++++++++--
 .../scheduler/capacity/LeafQueue.java           |  28 +-
 .../scheduler/capacity/ParentQueue.java         |   8 +-
 .../scheduler/capacity/PlanQueue.java           | 193 +++++++++++++
 .../scheduler/capacity/ReservationQueue.java    |  98 +++++++
 .../scheduler/common/QueueEntitlement.java      |  28 ++
 .../webapp/dao/CapacitySchedulerQueueInfo.java  |   9 +
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../TestCapacitySchedulerDynamicBehavior.java   | 282 +++++++++++++++++++
 .../capacity/TestReservationQueue.java          | 103 +++++++
 13 files changed, 1012 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 0b5447b..8e8d627 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -220,6 +221,24 @@ public abstract class AbstractYarnScheduler
         + " does not support moving apps between queues");
   }
 
+  public void removeQueue(String queueName) throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support removing queues");
+  }
+
+  @Override
+  public void addQueue(Queue newQueue) throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support this operation");
+  }
+
+  @Override
+  public void setEntitlement(String queue, QueueEntitlement entitlement)
+      throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support this operation");
+  }
+
   private void killOrphanContainerOnNode(RMNode node,
       NMContainerStatus container) {
     if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
@@ -503,4 +522,10 @@ public abstract class AbstractYarnScheduler
   public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
     return EnumSet.of(SchedulerResourceTypes.MEMORY);
   }
+
+  @Override
+  public Set<String> getPlanQueues() throws YarnException {
+    throw new YarnException(getClass().getSimpleName()
+        + " does not support reservations");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java
new file mode 100644
index 0000000..42dc36a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java
@@ -0,0 +1,13 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class SchedulerDynamicEditException extends YarnException {
+
+  private static final long serialVersionUID = 7100374511387193257L;
+
+  public SchedulerDynamicEditException(String string) {
+    super(string);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index b6c1018..d1b5275 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
@@ -224,6 +226,46 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
   void killAllAppsInQueue(String queueName) throws YarnException;
 
   /**
+   * Remove an existing queue. Implementations might limit when a queue could be
+   * removed (e.g., must have zero entitlement, and no applications running, or
+   * must be a leaf, etc..).
+   *
+   * @param queueName name of the queue to remove
+   * @throws YarnException
+   */
+  void removeQueue(String queueName) throws YarnException;
+
+  /**
+   * Add to the scheduler a new Queue. Implementations might limit what type of
+   * queues can be dynamically added (e.g., Queue must be a leaf, must be
+   * attached to existing parent, must have zero entitlement).
+   *
+   * @param newQueue the queue being added.
+   * @throws YarnException
+   */
+  void addQueue(Queue newQueue) throws YarnException;
+
+  /**
+   * This method increase the entitlement for current queue (must respect
+   * invariants, e.g., no overcommit of parents, non negative, etc.).
+   * Entitlement is a general term for weights in FairScheduler, capacity for
+   * the CapacityScheduler, etc.
+   *
+   * @param queue the queue for which we change entitlement
+   * @param entitlement the new entitlement for the queue (capacity,
+   *              maxCapacity, etc..)
+   * @throws YarnException
+   */
+  void setEntitlement(String queue, QueueEntitlement entitlement)
+      throws YarnException;
+
+  /**
+   * Gets the list of names for queues managed by the Reservation System
+   * @return the list of queues which support reservations
+   */
+  public Set<String> getPlanQueues() throws YarnException;  
+
+  /**
    * Return a collection of the resource types that are considered when
    * scheduling
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bdfc819..522b975 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -90,6 +92,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -473,9 +480,12 @@ public class CapacityScheduler extends
   private void validateExistingQueues(
       Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) 
   throws IOException {
-    for (String queue : queues.keySet()) {
-      if (!newQueues.containsKey(queue)) {
-        throw new IOException(queue + " cannot be found during refresh!");
+    // check that all static queues are included in the newQueues list
+    for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
+      if (!(e.getValue() instanceof ReservationQueue)) {
+        if (!newQueues.containsKey(e.getKey())) {
+          throw new IOException(e.getKey() + " cannot be found during refresh!");
+        }
       }
     }
   }
@@ -507,26 +517,42 @@ public class CapacityScheduler extends
       Map<String, CSQueue> oldQueues, 
       QueueHook hook) throws IOException {
     CSQueue queue;
+    String fullQueueName =
+        (parent == null) ? queueName
+            : (parent.getQueuePath() + "." + queueName);
     String[] childQueueNames = 
-      conf.getQueues((parent == null) ? 
-          queueName : (parent.getQueuePath()+"."+queueName));
+      conf.getQueues(fullQueueName);
+    boolean isReservableQueue = conf.isReservableQueue(fullQueueName);
     if (childQueueNames == null || childQueueNames.length == 0) {
       if (null == parent) {
         throw new IllegalStateException(
             "Queue configuration missing child queue names for " + queueName);
       }
-      queue = 
-          new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
-      
-      // Used only for unit tests
-      queue = hook.hook(queue);
+      // Check if the queue will be dynamically managed by the Reservation
+      // system
+     if (isReservableQueue) {
+        queue =
+            new PlanQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+      } else {
+        queue =
+            new LeafQueue(csContext, queueName, parent,
+                oldQueues.get(queueName));
+
+        // Used only for unit tests
+        queue = hook.hook(queue);
+      }
     } else {
+      if (isReservableQueue) {
+        throw new IllegalStateException(
+            "Only Leaf Queues can be reservable for " + queueName);
+      }
       ParentQueue parentQueue = 
         new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
 
       // Used only for unit tests
       queue = hook.hook(parentQueue);
-      
+
       List<CSQueue> childQueues = new ArrayList<CSQueue>();
       for (String childQueueName : childQueueNames) {
         CSQueue childQueue = 
@@ -548,7 +574,7 @@ public class CapacityScheduler extends
     return queue;
   }
 
-  synchronized CSQueue getQueue(String queueName) {
+  public synchronized CSQueue getQueue(String queueName) {
     if (queueName == null) {
       return null;
     }
@@ -716,7 +742,7 @@ public class CapacityScheduler extends
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application Attempt " + applicationAttemptId + " is done." +
-    		" finalState=" + rmAppAttemptFinalState);
+        " finalState=" + rmAppAttemptFinalState);
     
     FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
     SchedulerApplication<FiCaSchedulerApp> application =
@@ -995,9 +1021,16 @@ public class CapacityScheduler extends
     case APP_ADDED:
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(),
-        appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
+      String queueName =
+          resolveReservationQueueName(appAddedEvent.getQueue(),
+              appAddedEvent.getApplicationId(),
+              appAddedEvent.getReservationID());
+      if (queueName != null) {
+        addApplication(appAddedEvent.getApplicationId(),
+            queueName,
+            appAddedEvent.getUser(),
+            appAddedEvent.getIsAppRecovering());
+      }
     }
     break;
     case APP_REMOVED:
@@ -1230,6 +1263,123 @@ public class CapacityScheduler extends
     }
   }
 
+  private synchronized String resolveReservationQueueName(String queueName,
+      ApplicationId applicationId, ReservationId reservationID) {
+    CSQueue queue = getQueue(queueName);
+    // Check if the queue is a plan queue
+    if ((queue == null) || !(queue instanceof PlanQueue)) {
+      return queueName;
+    }
+    if (reservationID != null) {
+      String resQName = reservationID.toString();
+      queue = getQueue(resQName);
+      if (queue == null) {
+        String message =
+            "Application "
+                + applicationId
+                + " submitted to a reservation which is not yet currently active: "
+                + resQName;
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return null;
+      }
+      // use the reservation queue to run the app
+      queueName = resQName;
+    } else {
+      // use the default child queue of the plan for unreserved apps
+      queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    }
+    return queueName;
+  }
+
+  @Override
+  public synchronized void removeQueue(String queueName)
+      throws SchedulerDynamicEditException {
+    LOG.info("Removing queue: " + queueName);
+    CSQueue q = this.getQueue(queueName);
+    if (!(q instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("The queue that we are asked "
+          + "to remove (" + queueName + ") is not a ReservationQueue");
+    }
+    ReservationQueue disposableLeafQueue = (ReservationQueue) q;
+    // at this point we should have no more apps
+    if (disposableLeafQueue.getNumApplications() > 0) {
+      throw new SchedulerDynamicEditException("The queue " + queueName
+          + " is not empty " + disposableLeafQueue.getApplications().size()
+          + " active apps " + disposableLeafQueue.pendingApplications.size()
+          + " pending apps");
+    }
+
+    ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
+    this.queues.remove(queueName);
+    LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+  }
+
+  @Override
+  public synchronized void addQueue(Queue queue)
+      throws SchedulerDynamicEditException {
+
+    if (!(queue instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("Queue " + queue.getQueueName()
+          + " is not a ReservationQueue");
+    }
+
+    ReservationQueue newQueue = (ReservationQueue) queue;
+
+    if (newQueue.getParent() == null
+        || !(newQueue.getParent() instanceof PlanQueue)) {
+      throw new SchedulerDynamicEditException("ParentQueue for "
+          + newQueue.getQueueName()
+          + " is not properly set (should be set and be a PlanQueue)");
+    }
+
+    PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
+    String queuename = newQueue.getQueueName();
+    parentPlan.addChildQueue(newQueue);
+    this.queues.put(queuename, newQueue);
+    LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
+  }
+
+  @Override
+  public synchronized void setEntitlement(String inQueue,
+      QueueEntitlement entitlement) throws SchedulerDynamicEditException,
+      YarnException {
+    LeafQueue queue = getAndCheckLeafQueue(inQueue);
+    ParentQueue parent = (ParentQueue) queue.getParent();
+
+    if (!(queue instanceof ReservationQueue)) {
+      throw new SchedulerDynamicEditException("Entitlement can not be"
+          + " modified dynamically since queue " + inQueue
+          + " is not a ReservationQueue");
+    }
+
+    if (!(parent instanceof PlanQueue)) {
+      throw new SchedulerDynamicEditException("The parent of ReservationQueue "
+          + inQueue + " must be an PlanQueue");
+    }
+
+    ReservationQueue newQueue = (ReservationQueue) queue;
+
+    float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
+    float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity();
+
+    if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
+      // note: epsilon checks here are not ok, as the epsilons might accumulate
+      // and become a problem in aggregate
+      if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
+          && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
+        return;
+      }
+      newQueue.setEntitlement(entitlement);
+    } else {
+      throw new SchedulerDynamicEditException(
+          "Sum of child queues would exceed 100% for PlanQueue: "
+              + parent.getQueueName());
+    }
+    LOG.info("Set entitlement for ReservationQueue " + inQueue + "  to "
+        + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")");
+  }
+
   @Override
   public synchronized String moveApplication(ApplicationId appId,
       String targetQueueName) throws YarnException {
@@ -1237,11 +1387,12 @@ public class CapacityScheduler extends
         getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
     String sourceQueueName = app.getQueue().getQueueName();
     LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
-    LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+    String destQueueName = handleMoveToPlanQueue(targetQueueName);
+    LeafQueue dest = getAndCheckLeafQueue(destQueueName);
     // Validation check - ACLs, submission limits for user & queue
     String user = app.getUser();
     try {
-      dest.submitApplication(appId, user, targetQueueName);
+      dest.submitApplication(appId, user, destQueueName);
     } catch (AccessControlException e) {
       throw new YarnException(e);
     }
@@ -1260,7 +1411,7 @@ public class CapacityScheduler extends
     dest.submitApplicationAttempt(app, user);
     applications.get(appId).setQueue(dest);
     LOG.info("App: " + app.getApplicationId() + " successfully moved from "
-        + sourceQueueName + " to: " + targetQueueName);
+        + sourceQueueName + " to: " + destQueueName);
     return targetQueueName;
   }
 
@@ -1295,4 +1446,24 @@ public class CapacityScheduler extends
     return EnumSet
       .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
   }
+  
+  private String handleMoveToPlanQueue(String targetQueueName) {
+    CSQueue dest = getQueue(targetQueueName);
+    if (dest != null && dest instanceof PlanQueue) {
+      // use the default child reservation queue of the plan
+      targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    }
+    return targetQueueName;
+  }
+
+  @Override
+  public Set<String> getPlanQueues() {
+    Set<String> ret = new HashSet<String>();
+    for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
+      if (l.getValue() instanceof PlanQueue) {
+        ret.add(l.getKey());
+      }
+    }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c93c5f..b5268ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -85,8 +85,8 @@ public class LeafQueue implements CSQueue {
   private int userLimit;
   private float userLimitFactor;
 
-  private int maxApplications;
-  private int maxApplicationsPerUser;
+  protected int maxApplications;
+  protected int maxApplicationsPerUser;
   
   private float maxAMResourcePerQueuePercent;
   private int maxActiveApplications; // Based on absolute max capacity
@@ -150,8 +150,7 @@ public class LeafQueue implements CSQueue {
             Resources.subtract(maximumAllocation, minimumAllocation), 
             maximumAllocation);
 
-    float capacity = 
-      (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
+    float capacity = getCapacityFromConf();
     float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
 
     float maximumCapacity = 
@@ -217,6 +216,11 @@ public class LeafQueue implements CSQueue {
     this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
   }
 
+  // externalizing in method, to allow overriding
+  protected float getCapacityFromConf() {
+    return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
+  }
+
   private synchronized void setupQueueConfigs(
       Resource clusterResource,
       float capacity, float absoluteCapacity, 
@@ -475,7 +479,7 @@ public class LeafQueue implements CSQueue {
    * Set user limit factor - used only for testing.
    * @param userLimitFactor new user limit factor
    */
-  synchronized void setUserLimitFactor(int userLimitFactor) {
+  synchronized void setUserLimitFactor(float userLimitFactor) {
     this.userLimitFactor = userLimitFactor;
   }
 
@@ -817,7 +821,7 @@ public class LeafQueue implements CSQueue {
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
         return assignReservedContainer(application, node, reservedContainer,
-          clusterResource);
+            clusterResource);
       }
     }
     
@@ -1661,4 +1665,16 @@ public class LeafQueue implements CSQueue {
       getParent().detachContainer(clusterResource, application, rmContainer);
     }
   }
+
+  public void setCapacity(float capacity) {
+    this.capacity = capacity;
+  }
+
+  public void setAbsoluteCapacity(float absoluteCapacity) {
+    this.absoluteCapacity = absoluteCapacity;
+  }
+
+  public void setMaxApplications(int maxApplications) {
+    this.maxApplications = maxApplications;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 8c654b7..7e36b5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -75,7 +75,7 @@ public class ParentQueue implements CSQueue {
 
   private float usedCapacity = 0.0f;
 
-  private final Set<CSQueue> childQueues;
+  protected final Set<CSQueue> childQueues;
   private final Comparator<CSQueue> queueComparator;
   
   private Resource usedResources = Resources.createResource(0, 0);
@@ -156,7 +156,7 @@ public class ParentQueue implements CSQueue {
         ", fullname=" + getQueuePath()); 
   }
 
-  private synchronized void setupQueueConfigs(
+  protected synchronized void setupQueueConfigs(
       Resource clusterResource,
       float capacity, float absoluteCapacity, 
       float maximumCapacity, float absoluteMaxCapacity,
@@ -824,4 +824,8 @@ public class ParentQueue implements CSQueue {
       }
     }
   }
+
+  public Map<QueueACL, AccessControlList> getACLs() {
+    return acls;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
new file mode 100644
index 0000000..4ada778
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -0,0 +1,193 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a dynamic queue managed by the {@link ReservationSystem}.
+ * From the user perspective this is equivalent to a LeafQueue that respect
+ * reservations, but functionality wise is a sub-class of ParentQueue
+ *
+ */
+public class PlanQueue extends ParentQueue {
+
+  public static final String DEFAULT_QUEUE_SUFFIX = "-default";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
+
+  private int maxAppsForReservation;
+  private int maxAppsPerUserForReservation;
+  private int userLimit;
+  private float userLimitFactor;
+  protected CapacitySchedulerContext schedulerContext;
+  private boolean showReservationsAsQueues;
+
+  public PlanQueue(CapacitySchedulerContext cs, String queueName,
+      CSQueue parent, CSQueue old) {
+    super(cs, queueName, parent, old);
+
+    this.schedulerContext = cs;
+    // Set the reservation queue attributes for the Plan
+    CapacitySchedulerConfiguration conf = cs.getConfiguration();
+    String queuePath = super.getQueuePath();
+    int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
+    showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
+    if (maxAppsForReservation < 0) {
+      maxAppsForReservation =
+          (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
+              .getAbsoluteCapacity());
+    }
+    int userLimit = conf.getUserLimit(queuePath);
+    float userLimitFactor = conf.getUserLimitFactor(queuePath);
+    int maxAppsPerUserForReservation =
+        (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
+    updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
+        maxAppsPerUserForReservation);
+
+    StringBuffer queueInfo = new StringBuffer();
+    queueInfo.append("Created Plan Queue: ").append(queueName)
+        .append("\nwith capacity: [").append(super.getCapacity())
+        .append("]\nwith max capacity: [").append(super.getMaximumCapacity())
+        .append("\nwith max reservation apps: [").append(maxAppsForReservation)
+        .append("]\nwith max reservation apps per user: [")
+        .append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
+        .append(userLimit).append("]\nwith user limit factor: [")
+        .append(userLimitFactor).append("].");
+    LOG.info(queueInfo.toString());
+  }
+
+  @Override
+  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    // Sanity check
+    if (!(newlyParsedQueue instanceof PlanQueue)
+        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
+      throw new IOException("Trying to reinitialize " + getQueuePath()
+          + " from " + newlyParsedQueue.getQueuePath());
+    }
+
+    PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+
+    if (newlyParsedParentQueue.getChildQueues().size() > 0) {
+      throw new IOException(
+          "Reservable Queue should not have sub-queues in the"
+              + "configuration");
+    }
+
+    // Set new configs
+    setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
+        newlyParsedParentQueue.getAbsoluteCapacity(),
+        newlyParsedParentQueue.getMaximumCapacity(),
+        newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
+        newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs());
+
+    updateQuotas(newlyParsedParentQueue.userLimit,
+        newlyParsedParentQueue.userLimitFactor,
+        newlyParsedParentQueue.maxAppsForReservation,
+        newlyParsedParentQueue.maxAppsPerUserForReservation);
+
+    // run reinitialize on each existing queue, to trigger absolute cap
+    // recomputations
+    for (CSQueue res : this.getChildQueues()) {
+      res.reinitialize(res, clusterResource);
+    }
+    showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
+  }
+
+  synchronized void addChildQueue(CSQueue newQueue)
+      throws SchedulerDynamicEditException {
+    if (newQueue.getCapacity() > 0) {
+      throw new SchedulerDynamicEditException("Queue " + newQueue
+          + " being added has non zero capacity.");
+    }
+    boolean added = this.childQueues.add(newQueue);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("updateChildQueues (action: add queue): " + added + " "
+          + getChildQueuesToPrint());
+    }
+  }
+
+  synchronized void removeChildQueue(CSQueue remQueue)
+      throws SchedulerDynamicEditException {
+    if (remQueue.getCapacity() > 0) {
+      throw new SchedulerDynamicEditException("Queue " + remQueue
+          + " being removed has non zero capacity.");
+    }
+    Iterator<CSQueue> qiter = childQueues.iterator();
+    while (qiter.hasNext()) {
+      CSQueue cs = qiter.next();
+      if (cs.equals(remQueue)) {
+        qiter.remove();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removed child queue: {}", cs.getQueueName());
+        }
+      }
+    }
+  }
+
+  protected synchronized float sumOfChildCapacities() {
+    float ret = 0;
+    for (CSQueue l : childQueues) {
+      ret += l.getCapacity();
+    }
+    return ret;
+  }
+
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    this.userLimit = userLimit;
+    this.userLimitFactor = userLimitFactor;
+    this.maxAppsForReservation = maxAppsForReservation;
+    this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
+  }
+
+  /**
+   * Number of maximum applications for each of the reservations in this Plan.
+   *
+   * @return maxAppsForreservation
+   */
+  public int getMaxApplicationsForReservations() {
+    return maxAppsForReservation;
+  }
+
+  /**
+   * Number of maximum applications per user for each of the reservations in
+   * this Plan.
+   *
+   * @return maxAppsPerUserForreservation
+   */
+  public int getMaxApplicationsPerUserForReservation() {
+    return maxAppsPerUserForReservation;
+  }
+
+  /**
+   * User limit value for each of the reservations in this Plan.
+   *
+   * @return userLimit
+   */
+  public int getUserLimitForReservation() {
+    return userLimit;
+  }
+
+  /**
+   * User limit factor value for each of the reservations in this Plan.
+   *
+   * @return userLimitFactor
+   */
+  public float getUserLimitFactor() {
+    return userLimitFactor;
+  }
+
+  /**
+   * Determine whether to hide/show the ReservationQueues
+   */
+  public boolean showReservationsAsQueues() {
+    return showReservationsAsQueues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
new file mode 100644
index 0000000..48733fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -0,0 +1,98 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a dynamic {@link LeafQueue} managed by the
+ * {@link ReservationSystem}
+ *
+ */
+public class ReservationQueue extends LeafQueue {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ReservationQueue.class);
+
+  private PlanQueue parent;
+
+  private int maxSystemApps;
+
+  public ReservationQueue(CapacitySchedulerContext cs, String queueName,
+      PlanQueue parent) {
+    super(cs, queueName, parent, null);
+    maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
+    // the following parameters are common to all reservation in the plan
+    updateQuotas(parent.getUserLimitForReservation(),
+        parent.getUserLimitFactor(),
+        parent.getMaxApplicationsForReservations(),
+        parent.getMaxApplicationsPerUserForReservation());
+    this.parent = parent;
+  }
+
+  @Override
+  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    // Sanity check
+    if (!(newlyParsedQueue instanceof ReservationQueue)
+        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
+      throw new IOException("Trying to reinitialize " + getQueuePath()
+          + " from " + newlyParsedQueue.getQueuePath());
+    }
+    CSQueueUtils.updateQueueStatistics(
+        parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
+        parent, parent.schedulerContext.getClusterResource(),
+        parent.schedulerContext.getMinimumResourceCapability());
+    super.reinitialize(newlyParsedQueue, clusterResource);
+    updateQuotas(parent.getUserLimitForReservation(),
+        parent.getUserLimitFactor(),
+        parent.getMaxApplicationsForReservations(),
+        parent.getMaxApplicationsPerUserForReservation());
+  }
+
+  /**
+   * This methods to change capacity for a queue and adjusts its
+   * absoluteCapacity
+   * 
+   * @param entitlement the new entitlement for the queue (capacity,
+   *          maxCapacity, etc..)
+   * @throws SchedulerDynamicEditException
+   */
+  public synchronized void setEntitlement(QueueEntitlement entitlement)
+      throws SchedulerDynamicEditException {
+    float capacity = entitlement.getCapacity();
+    if (capacity < 0 || capacity > 1.0f) {
+      throw new SchedulerDynamicEditException(
+          "Capacity demand is not in the [0,1] range: " + capacity);
+    }
+    setCapacity(capacity);
+    setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
+    setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity()));
+    // note: we currently set maxCapacity to capacity
+    // this might be revised later
+    setMaxCapacity(entitlement.getMaxCapacity());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("successfully changed to " + capacity + " for queue "
+          + this.getQueueName());
+    }
+  }
+
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    setUserLimit(userLimit);
+    setUserLimitFactor(userLimitFactor);
+    setMaxApplications(maxAppsForReservation);
+    maxApplicationsPerUser = maxAppsPerUserForReservation;
+  }
+
+  // used by the super constructor, we initialize to zero
+  protected float getCapacityFromConf() {
+    return 0f;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
new file mode 100644
index 0000000..a348e13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+public class QueueEntitlement {
+
+  private float capacity;
+  private float maxCapacity;
+
+  public QueueEntitlement(float capacity, float maxCapacity){
+    this.setCapacity(capacity);
+    this.maxCapacity = maxCapacity;
+   }
+
+  public float getMaxCapacity() {
+    return maxCapacity;
+  }
+
+  public void setMaxCapacity(float maxCapacity) {
+    this.maxCapacity = maxCapacity;
+  }
+
+  public float getCapacity() {
+    return capacity;
+  }
+
+  public void setCapacity(float capacity) {
+    this.capacity = capacity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
index ac16ce0..0a8faad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -48,6 +49,7 @@ public class CapacitySchedulerQueueInfo {
   protected QueueState state;
   protected CapacitySchedulerQueueInfoList queues;
   protected ResourceInfo resourcesUsed;
+  private boolean hideReservationQueues = true;
 
   CapacitySchedulerQueueInfo() {
   };
@@ -69,6 +71,10 @@ public class CapacitySchedulerQueueInfo {
     queueName = q.getQueueName();
     state = q.getState();
     resourcesUsed = new ResourceInfo(q.getUsedResources());
+    if(q instanceof PlanQueue &&
+       ((PlanQueue)q).showReservationsAsQueues()) {
+      hideReservationQueues = false;
+    }
   }
 
   public float getCapacity() {
@@ -112,6 +118,9 @@ public class CapacitySchedulerQueueInfo {
   }
 
   public CapacitySchedulerQueueInfoList getQueues() {
+    if(hideReservationQueues) {
+      return new CapacitySchedulerQueueInfoList();
+    }
     return this.queues;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e029749..05bb427 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -408,7 +408,7 @@ public class TestCapacityScheduler {
     cs.stop();
   }
 
-  private void checkQueueCapacities(CapacityScheduler cs,
+  void checkQueueCapacities(CapacityScheduler cs,
       float capacityA, float capacityB) {
     CSQueue rootQueue = cs.getRootQueue();
     CSQueue queueA = findQueue(rootQueue, A);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
new file mode 100644
index 0000000..aecbfa8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCapacitySchedulerDynamicBehavior {
+  private static final Log LOG = LogFactory
+      .getLog(TestCapacitySchedulerDynamicBehavior.class);
+  private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+  private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+  private static final String B1 = B + ".b1";
+  private static final String B2 = B + ".b2";
+  private static final String B3 = B + ".b3";
+  private static float A_CAPACITY = 10.5f;
+  private static float B_CAPACITY = 89.5f;
+  private static float A1_CAPACITY = 30;
+  private static float A2_CAPACITY = 70;
+  private static float B1_CAPACITY = 79.2f;
+  private static float B2_CAPACITY = 0.8f;
+  private static float B3_CAPACITY = 20;
+
+  private final TestCapacityScheduler tcs = new TestCapacityScheduler();
+
+  private int GB = 1024;
+
+  private MockRM rm;
+
+  @Before
+  public void setUp() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupPlanQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATIONS_ENABLE, false);
+    rm = new MockRM(conf);
+    rm.start();
+  }
+
+  @Test
+  public void testRefreshQueuesWithReservations() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // Test add another reservation queue and use setEntitlement to modify
+    // capacity
+    ReservationQueue a2 =
+        new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a2);
+    cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
+
+    // Verify all allocations match
+    tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // Reinitialize and verify all dynamic queued survived
+    CapacitySchedulerConfiguration conf = cs.getConfiguration();
+    conf.setCapacity(A, 80f);
+    conf.setCapacity(B, 20f);
+    cs.reinitialize(conf, rm.getRMContext());
+
+    tcs.checkQueueCapacities(cs, 80f, 20f);
+  }
+
+  @Test
+  public void testAddQueueFailCases() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    try {
+      // Test invalid addition (adding non-zero size queue)
+      ReservationQueue a1 =
+          new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+      a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+      cs.addQueue(a1);
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // Test add another reservation queue and use setEntitlement to modify
+    // capacity
+    ReservationQueue a2 =
+        new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+
+    cs.addQueue(a2);
+
+    try {
+      // Test invalid entitlement (sum of queues exceed 100%)
+      cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100 + 0.1f,
+          1.0f));
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
+
+    cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
+
+    // Verify all allocations match
+    tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    cs.stop();
+  }
+
+  @Test
+  public void testRemoveQueue() throws Exception {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Test add one reservation dynamically and manually modify capacity
+    ReservationQueue a1 =
+        new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+    cs.addQueue(a1);
+    a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    try {
+      cs.removeQueue("a1");
+      fail();
+    } catch (SchedulerDynamicEditException s) {
+      // expected a1 contains applications
+    }
+    // clear queue by killling all apps
+    cs.killAllAppsInQueue("a1");
+    // wait for events of move to propagate
+    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+
+    try {
+      cs.removeQueue("a1");
+      fail();
+    } catch (SchedulerDynamicEditException s) {
+      // expected a1 is not zero capacity
+    }
+    // set capacity to zero
+    cs.setEntitlement("a1", new QueueEntitlement(0f, 0f));
+    cs.removeQueue("a1");
+
+    assertTrue(cs.getQueue("a1") == null);
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppToPlanQueue() throws Exception {
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "b1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertEquals(1, appsInB.size());
+    assertTrue(appsInB.contains(appAttemptId));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    String queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("b1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // create the default reservation queue
+    String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    ReservationQueue defQ =
+        new ReservationQueue(scheduler, defQName,
+            (PlanQueue) scheduler.getQueue("a"));
+    scheduler.addQueue(defQ);
+    defQ.setEntitlement(new QueueEntitlement(1f, 1f));
+
+    List<ApplicationAttemptId> appsInDefQ = scheduler.getAppsInQueue(defQName);
+    assertTrue(appsInDefQ.isEmpty());
+
+    // now move the app to plan queue
+    scheduler.moveApplication(app.getApplicationId(), "a");
+
+    // check postconditions
+    appsInDefQ = scheduler.getAppsInQueue(defQName);
+    assertEquals(1, appsInDefQ.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInDefQ.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals(defQName));
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
+  private void setupPlanQueueConfiguration(CapacitySchedulerConfiguration conf) {
+
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(B, new String[] { "b1", "b2", "b3" });
+    conf.setCapacity(B1, B1_CAPACITY);
+    conf.setUserLimitFactor(B1, 100.0f);
+    conf.setCapacity(B2, B2_CAPACITY);
+    conf.setUserLimitFactor(B2, 100.0f);
+    conf.setCapacity(B3, B3_CAPACITY);
+    conf.setUserLimitFactor(B3, 100.0f);
+
+    conf.setReservableQueue(A, true);
+    conf.setReservationWindow(A, 86400 * 1000);
+    conf.setAverageCapacity(A, 1.0f);
+
+    LOG.info("Setup a as a plan queue");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60d4f29d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
new file mode 100644
index 0000000..c53b7a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationQueue {
+
+  CapacitySchedulerConfiguration csConf;
+  CapacitySchedulerContext csContext;
+  final static int GB = 1024;
+  private final ResourceCalculator resourceCalculator =
+      new DefaultResourceCalculator();
+  ReservationQueue reservationQueue;
+
+  @Before
+  public void setup() {
+
+    // setup a context / conf
+    csConf = new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration();
+    csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getConf()).thenReturn(conf);
+    when(csContext.getMinimumResourceCapability()).thenReturn(
+        Resources.createResource(GB, 1));
+    when(csContext.getMaximumResourceCapability()).thenReturn(
+        Resources.createResource(16 * GB, 32));
+    when(csContext.getClusterResource()).thenReturn(
+        Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+
+    // create a queue
+    PlanQueue pq = new PlanQueue(csContext, "root", null, null);
+    reservationQueue = new ReservationQueue(csContext, "a", pq);
+
+  }
+
+  @Test
+  public void testAddSubtractCapacity() throws Exception {
+
+    // verify that setting, adding, subtracting capacity works
+    reservationQueue.setCapacity(1.0F);
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 0.9 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f));
+    assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+        reservationQueue.getCapacity() < CSQueueUtils.EPSILON);
+
+    try {
+      reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
+      fail();
+    } catch (SchedulerDynamicEditException iae) {
+      // expected
+      assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+          reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    }
+
+    try {
+      reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
+      fail();
+    } catch (SchedulerDynamicEditException iae) {
+      // expected
+      assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
+          reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
+    }
+
+  }
+}


Mime
View raw message