hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [20/37] hadoop git commit: YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan
Date Mon, 27 Jul 2015 21:58:31 GMT
YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83fe34ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83fe34ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83fe34ac

Branch: refs/heads/HDFS-7240
Commit: 83fe34ac0896cee0918bbfad7bd51231e4aec39b
Parents: fc42fa8
Author: Jian He <jianhe@apache.org>
Authored: Fri Jul 24 14:00:25 2015 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Fri Jul 24 14:00:25 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/resourcemanager/RMContextImpl.java   |   3 +-
 .../scheduler/ResourceLimits.java               |  19 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  27 +-
 .../scheduler/capacity/CSAssignment.java        |  12 +-
 .../capacity/CapacityHeadroomProvider.java      |  16 +-
 .../scheduler/capacity/CapacityScheduler.java   |  14 -
 .../scheduler/capacity/LeafQueue.java           | 833 +++----------------
 .../scheduler/capacity/ParentQueue.java         |  16 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 721 +++++++++++++++-
 .../capacity/TestApplicationLimits.java         |  15 +-
 .../capacity/TestCapacityScheduler.java         |   3 +-
 .../capacity/TestContainerAllocation.java       |  85 +-
 .../scheduler/capacity/TestLeafQueue.java       | 191 +----
 .../scheduler/capacity/TestReservations.java    | 111 +--
 .../scheduler/capacity/TestUtils.java           |  25 +-
 16 files changed, 1048 insertions(+), 1046 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d1546b2..cf00fe5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -345,6 +345,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison
     via Colin P. McCabe)
 
+    YARN-3026. Move application-specific container allocation logic from
+    LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 2f9209c..8cadc3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -292,7 +292,8 @@ public class RMContextImpl implements RMContext {
     activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
   }
 
-  void setScheduler(ResourceScheduler scheduler) {
+  @VisibleForTesting
+  public void setScheduler(ResourceScheduler scheduler) {
     activeServiceContext.setScheduler(scheduler);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 8074794..c545e9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -26,20 +26,25 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * that, it's not "extra") resource you can get.
  */
 public class ResourceLimits {
-  volatile Resource limit;
+  private volatile Resource limit;
 
   // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
   // config. This limit indicates how much we need to unreserve to allocate
   // another container.
   private volatile Resource amountNeededUnreserve;
 
+  // How much resource you can use for next allocation, if this isn't enough for
+  // next container allocation, you may need to consider unreserve some
+  // containers.
+  private volatile Resource headroom;
+
   public ResourceLimits(Resource limit) {
-    this.amountNeededUnreserve = Resources.none();
-    this.limit = limit;
+    this(limit, Resources.none());
   }
 
   public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
     this.amountNeededUnreserve = amountNeededUnreserve;
+    this.headroom = limit;
     this.limit = limit;
   }
 
@@ -47,6 +52,14 @@ public class ResourceLimits {
     return limit;
   }
 
+  public Resource getHeadroom() {
+    return headroom;
+  }
+
+  public void setHeadroom(Resource headroom) {
+    this.headroom = headroom;
+  }
+
   public Resource getAmountNeededUnreserve() {
     return amountNeededUnreserve;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 7f8e164..dcc4205 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -65,7 +65,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   volatile int numContainers;
   
   final Resource minimumAllocation;
-  Resource maximumAllocation;
+  volatile Resource maximumAllocation;
   QueueState state;
   final CSQueueMetrics metrics;
   protected final PrivilegedEntity queueEntity;
@@ -77,7 +77,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   Map<AccessType, AccessControlList> acls = 
       new HashMap<AccessType, AccessControlList>();
-  boolean reservationsContinueLooking;
+  volatile boolean reservationsContinueLooking;
   private boolean preemptionDisabled;
 
   // Track resource usage-by-label like used-resource/pending-resource, etc.
@@ -333,7 +333,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   @Private
-  public synchronized Resource getMaximumAllocation() {
+  public Resource getMaximumAllocation() {
     return maximumAllocation;
   }
   
@@ -448,13 +448,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      String nodePartition, ResourceLimits currentResourceLimits,
-      Resource nowRequired, Resource resourceCouldBeUnreserved,
+      String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved,
       SchedulingMode schedulingMode) {
-    // New total resource = used + required
-    Resource newTotalResource =
-        Resources.add(queueUsage.getUsed(nodePartition), nowRequired);
-
     // Get current limited resource: 
     // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
     // queues' max capacity.
@@ -470,8 +465,14 @@ public abstract class AbstractCSQueue implements CSQueue {
         getCurrentLimitResource(nodePartition, clusterResource,
             currentResourceLimits, schedulingMode);
 
-    if (Resources.greaterThan(resourceCalculator, clusterResource,
-        newTotalResource, currentLimitResource)) {
+    Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
+
+    // Set headroom for currentResourceLimits
+    currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource,
+        nowTotalUsed));
+
+    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
+        nowTotalUsed, currentLimitResource)) {
 
       // if reservation continous looking enabled, check to see if could we
       // potentially use this node instead of a reserved node if the application
@@ -483,7 +484,7 @@ public abstract class AbstractCSQueue implements CSQueue {
               resourceCouldBeUnreserved, Resources.none())) {
         // resource-without-reserved = used - reserved
         Resource newTotalWithoutReservedResource =
-            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+            Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved);
 
         // when total-used-without-reserved-resource < currentLimit, we still
         // have chance to allocate on this node by unreserving some containers
@@ -498,8 +499,6 @@ public abstract class AbstractCSQueue implements CSQueue {
                 + newTotalWithoutReservedResource + ", maxLimitCapacity: "
                 + currentLimitResource);
           }
-          currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
-            currentLimitResource));
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 2ba2709..ceb6f7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -31,8 +31,8 @@ public class CSAssignment {
 
   final private Resource resource;
   private NodeType type;
-  private final RMContainer excessReservation;
-  private final FiCaSchedulerApp application;
+  private RMContainer excessReservation;
+  private FiCaSchedulerApp application;
   private final boolean skipped;
   private boolean fulfilledReservation;
   private final AssignmentInformation assignmentInformation;
@@ -80,10 +80,18 @@ public class CSAssignment {
     return application;
   }
 
+  public void setApplication(FiCaSchedulerApp application) {
+    this.application = application;
+  }
+
   public RMContainer getExcessReservation() {
     return excessReservation;
   }
 
+  public void setExcessReservation(RMContainer rmContainer) {
+    excessReservation = rmContainer;
+  }
+
   public boolean getSkipped() {
     return skipped;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index c6524c6..a3adf9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -25,22 +25,16 @@ public class CapacityHeadroomProvider {
   LeafQueue.User user;
   LeafQueue queue;
   FiCaSchedulerApp application;
-  Resource required;
   LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
   
-  public CapacityHeadroomProvider(
-    LeafQueue.User user,
-    LeafQueue queue,
-    FiCaSchedulerApp application,
-    Resource required,
-    LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
-    
+  public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
+      FiCaSchedulerApp application,
+      LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
+
     this.user = user;
     this.queue = queue;
     this.application = application;
-    this.required = required;
     this.queueResourceLimitsInfo = queueResourceLimitsInfo;
-    
   }
   
   public Resource getHeadroom() {
@@ -52,7 +46,7 @@ public class CapacityHeadroomProvider {
       clusterResource = queueResourceLimitsInfo.getClusterResource();
     }
     Resource headroom = queue.getHeadroom(user, queueCurrentLimit, 
-      clusterResource, application, required);
+      clusterResource, application);
     
     // Corner case to deal with applications being slightly over-limit
     if (headroom.getMemory() < 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5a20f8b..68e608a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1178,16 +1178,6 @@ public class CapacityScheduler extends
         updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
         schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
       }
-
-      RMContainer excessReservation = assignment.getExcessReservation();
-      if (excessReservation != null) {
-        Container container = excessReservation.getContainer();
-        queue.completedContainer(clusterResource, assignment.getApplication(),
-            node, excessReservation, SchedulerUtils
-                .createAbnormalContainerStatus(container.getId(),
-                    SchedulerUtils.UNRESERVED_CONTAINER),
-            RMContainerEventType.RELEASED, null, true);
-      }
     }
 
     // Try to schedule more if there are no reservations to fulfill
@@ -1241,10 +1231,6 @@ public class CapacityScheduler extends
                 RMNodeLabelsManager.NO_LABEL, clusterResource)),
             SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
         updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
-        if (Resources.greaterThan(calculator, clusterResource,
-            assignment.getResource(), Resources.none())) {
-          return;
-        }
       }
     } else {
       LOG.info("Skipping scheduling since node "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c283f4..acfbad0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -31,7 +31,6 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -42,30 +41,24 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -93,7 +86,7 @@ public class LeafQueue extends AbstractCSQueue {
   
   private float maxAMResourcePerQueuePercent;
   
-  private int nodeLocalityDelay;
+  private volatile int nodeLocalityDelay;
 
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
@@ -102,7 +95,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   Set<FiCaSchedulerApp> pendingApplications;
   
-  private float minimumAllocationFactor;
+  private volatile float minimumAllocationFactor;
 
   private Map<String, User> users = new HashMap<String, User>();
 
@@ -400,11 +393,6 @@ public class LeafQueue extends AbstractCSQueue {
     return Collections.singletonList(userAclInfo);
   }
 
-  @Private
-  public int getNodeLocalityDelay() {
-    return nodeLocalityDelay;
-  }
-  
   public String toString() {
     return queueName + ": " + 
         "capacity=" + queueCapacities.getCapacity() + ", " + 
@@ -745,39 +733,57 @@ public class LeafQueue extends AbstractCSQueue {
     return applicationAttemptMap.get(applicationAttemptId);
   }
   
+  private void handleExcessReservedContainer(Resource clusterResource,
+      CSAssignment assignment) {
+    if (assignment.getExcessReservation() != null) {
+      RMContainer excessReservedContainer = assignment.getExcessReservation();
+
+      completedContainer(clusterResource, assignment.getApplication(),
+          scheduler.getNode(excessReservedContainer.getAllocatedNode()),
+          excessReservedContainer,
+          SchedulerUtils.createAbnormalContainerStatus(
+              excessReservedContainer.getContainerId(),
+              SchedulerUtils.UNRESERVED_CONTAINER),
+          RMContainerEventType.RELEASED, null, false);
+
+      assignment.setExcessReservation(null);
+    }
+  }
+  
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-    
-    if(LOG.isDebugEnabled()) {
+
+    if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " #applications=" + 
-        orderingPolicy.getNumSchedulableEntities());
+          + " #applications=" + orderingPolicy.getNumSchedulableEntities());
     }
-    
+
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      FiCaSchedulerApp application = 
+      FiCaSchedulerApp application =
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
-        return assignReservedContainer(application, node, reservedContainer,
+        CSAssignment assignment = application.assignReservedContainer(node, reservedContainer,
             clusterResource, schedulingMode);
+        handleExcessReservedContainer(clusterResource, assignment);
+        return assignment;
       }
     }
-    
+
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
       return NULL_ASSIGNMENT;
     }
-    
+
     // Check if this queue need more resource, simply skip allocation if this
     // queue doesn't need more resources.
-    if (!hasPendingResourceRequest(node.getPartition(),
-        clusterResource, schedulingMode)) {
+    if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
+        schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
             + ", because it doesn't need more resource, schedulingMode="
@@ -785,233 +791,74 @@ public class LeafQueue extends AbstractCSQueue {
       }
       return NULL_ASSIGNMENT;
     }
-    
+
     for (Iterator<FiCaSchedulerApp> assignmentIterator =
-        orderingPolicy.getAssignmentIterator();
-        assignmentIterator.hasNext();) {
+        orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
       FiCaSchedulerApp application = assignmentIterator.next();
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("pre-assignContainers for application "
-        + application.getApplicationId());
-        application.showRequests();
+
+      // Check queue max-capacity limit
+      if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+          currentResourceLimits, application.getCurrentReservation(),
+          schedulingMode)) {
+        return NULL_ASSIGNMENT;
       }
       
-      // Check if application needs more resource, skip if it doesn't need more.
-      if (!application.hasPendingResourceRequest(resourceCalculator,
-          node.getPartition(), clusterResource, schedulingMode)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
-              + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-label=" + node.getPartition());
-        }
+      Resource userLimit =
+          computeUserLimitAndSetHeadroom(application, clusterResource,
+              node.getPartition(), schedulingMode);
+
+      // Check user limit
+      if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
+          application, node.getPartition(), currentResourceLimits)) {
         continue;
       }
 
-      synchronized (application) {
-        // Check if this resource is on the blacklist
-        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
-          continue;
-        }
-        
-        // Schedule in priority order
-        for (Priority priority : application.getPriorities()) {
-          ResourceRequest anyRequest =
-              application.getResourceRequest(priority, ResourceRequest.ANY);
-          if (null == anyRequest) {
-            continue;
-          }
-          
-          // Required resource
-          Resource required = anyRequest.getCapability();
+      // Try to schedule
+      CSAssignment assignment =
+          application.assignContainers(clusterResource, node,
+              currentResourceLimits, schedulingMode);
 
-          // Do we need containers at this 'priority'?
-          if (application.getTotalRequiredResources(priority) <= 0) {
-            continue;
-          }
-          
-          // AM container allocation doesn't support non-exclusive allocation to
-          // avoid painful of preempt an AM container
-          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-            RMAppAttempt rmAppAttempt =
-                csContext.getRMContext().getRMApps()
-                    .get(application.getApplicationId()).getCurrentAppAttempt();
-            if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
-                && null == rmAppAttempt.getMasterContainer()) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Skip allocating AM container to app_attempt="
-                    + application.getApplicationAttemptId()
-                    + ", don't allow to allocate AM container in non-exclusive mode");
-              }
-              break;
-            }
-          }
-          
-          // Is the node-label-expression of this offswitch resource request
-          // matches the node's label?
-          // If not match, jump to next priority.
-          if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-              anyRequest, node.getPartition(), schedulingMode)) {
-            continue;
-          }
-          
-          if (!this.reservationsContinueLooking) {
-            if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("doesn't need containers based on reservation algo!");
-              }
-              continue;
-            }
-          }
-          
-          // Compute user-limit & set headroom
-          // Note: We compute both user-limit & headroom with the highest 
-          //       priority request as the target. 
-          //       This works since we never assign lower priority requests
-          //       before all higher priority ones are serviced.
-          Resource userLimit = 
-              computeUserLimitAndSetHeadroom(application, clusterResource, 
-                  required, node.getPartition(), schedulingMode);
-
-          // Check queue max-capacity limit
-          if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-              currentResourceLimits, required,
-              application.getCurrentReservation(), schedulingMode)) {
-            return NULL_ASSIGNMENT;
-          }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("post-assignContainers for application "
+            + application.getApplicationId());
+        application.showRequests();
+      }
 
-          // Check user limit
-          if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-              application, node.getPartition(), currentResourceLimits)) {
-            break;
-          }
+      // Did we schedule or reserve a container?
+      Resource assigned = assignment.getResource();
+      
+      handleExcessReservedContainer(clusterResource, assignment);
 
-          // Inform the application it is about to get a scheduling opportunity
-          application.addSchedulingOpportunity(priority);
-          
-          // Increase missed-non-partitioned-resource-request-opportunity.
-          // This is to make sure non-partitioned-resource-request will prefer
-          // to be allocated to non-partitioned nodes
-          int missedNonPartitionedRequestSchedulingOpportunity = 0;
-          if (anyRequest.getNodeLabelExpression().equals(
-              RMNodeLabelsManager.NO_LABEL)) {
-            missedNonPartitionedRequestSchedulingOpportunity =
-                application
-                    .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
-          }
-          
-          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-            // Before doing allocation, we need to check scheduling opportunity to
-            // make sure : non-partitioned resource request should be scheduled to
-            // non-partitioned partition first.
-            if (missedNonPartitionedRequestSchedulingOpportunity < scheduler
-                .getNumClusterNodes()) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Skip app_attempt="
-                    + application.getApplicationAttemptId()
-                    + " priority="
-                    + priority
-                    + " because missed-non-partitioned-resource-request"
-                    + " opportunity under requred:"
-                    + " Now=" + missedNonPartitionedRequestSchedulingOpportunity
-                    + " required="
-                    + scheduler.getNumClusterNodes());
-              }
-
-              break;
-            }
-          }
-          
-          // Try to schedule
-          CSAssignment assignment =
-            assignContainersOnNode(clusterResource, node, application, priority,
-                null, schedulingMode, currentResourceLimits);
-
-          // Did the application skip this node?
-          if (assignment.getSkipped()) {
-            // Don't count 'skipped nodes' as a scheduling opportunity!
-            application.subtractSchedulingOpportunity(priority);
-            continue;
-          }
-          
-          // Did we schedule or reserve a container?
-          Resource assigned = assignment.getResource();
-          if (Resources.greaterThan(
-              resourceCalculator, clusterResource, assigned, Resources.none())) {
-            // Get reserved or allocated container from application
-            RMContainer reservedOrAllocatedRMContainer =
-                application.getRMContainer(assignment
-                    .getAssignmentInformation()
-                    .getFirstAllocatedOrReservedContainerId());
-
-            // Book-keeping 
-            // Note: Update headroom to account for current allocation too...
-            allocateResource(clusterResource, application, assigned,
-                node.getPartition(), reservedOrAllocatedRMContainer);
-            
-            // Don't reset scheduling opportunities for offswitch assignments
-            // otherwise the app will be delayed for each non-local assignment.
-            // This helps apps with many off-cluster requests schedule faster.
-            if (assignment.getType() != NodeType.OFF_SWITCH) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Resetting scheduling opportunities");
-              }
-              application.resetSchedulingOpportunities(priority);
-            }
-            // Non-exclusive scheduling opportunity is different: we need reset
-            // it every time to make sure non-labeled resource request will be
-            // most likely allocated on non-labeled nodes first. 
-            application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
-            
-            // Done
-            return assignment;
-          } else {
-            // Do not assign out of order w.r.t priorities
-            break;
-          }
-        }
-      }
+      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
+          Resources.none())) {
+        // Get reserved or allocated container from application
+        RMContainer reservedOrAllocatedRMContainer =
+            application.getRMContainer(assignment.getAssignmentInformation()
+                .getFirstAllocatedOrReservedContainerId());
 
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("post-assignContainers for application "
-          + application.getApplicationId());
+        // Book-keeping
+        // Note: Update headroom to account for current allocation too...
+        allocateResource(clusterResource, application, assigned,
+            node.getPartition(), reservedOrAllocatedRMContainer);
+
+        // Done
+        return assignment;
+      } else if (!assignment.getSkipped()) {
+        // If we don't allocate anything, and it is not skipped by application,
+        // we will return to respect FIFO of applications
+        return NULL_ASSIGNMENT;
       }
-      application.showRequests();
     }
-  
-    return NULL_ASSIGNMENT;
 
+    return NULL_ASSIGNMENT;
   }
 
-  private synchronized CSAssignment assignReservedContainer(
-      FiCaSchedulerApp application, FiCaSchedulerNode node,
-      RMContainer rmContainer, Resource clusterResource,
-      SchedulingMode schedulingMode) {
-    // Do we still need this reservation?
-    Priority priority = rmContainer.getReservedPriority();
-    if (application.getTotalRequiredResources(priority) == 0) {
-      // Release
-      return new CSAssignment(application, rmContainer);
-    }
-
-    // Try to assign if we have sufficient resources
-    CSAssignment tmp =
-        assignContainersOnNode(clusterResource, node, application, priority,
-          rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
-    
-    // Doesn't matter... since it's already charged for at time of reservation
-    // "re-reservation" is *free*
-    CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-    if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
-      ret.setFulfilledReservation(true);
-    }
-    return ret;
-  }
-  
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
-      Resource clusterResource, FiCaSchedulerApp application, Resource required) {
+      Resource clusterResource, FiCaSchedulerApp application) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-        computeUserLimit(application, clusterResource, required, user,
-            RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+        computeUserLimit(application, clusterResource, user,
+            RMNodeLabelsManager.NO_LABEL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
   }
   
   private Resource getHeadroom(User user, Resource currentResourceLimit,
@@ -1055,7 +902,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
-      Resource clusterResource, Resource required, String nodePartition,
+      Resource clusterResource, String nodePartition,
       SchedulingMode schedulingMode) {
     String user = application.getUser();
     User queueUser = getUser(user);
@@ -1063,8 +910,8 @@ public class LeafQueue extends AbstractCSQueue {
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
     Resource userLimit =
-        computeUserLimit(application, clusterResource, required,
-            queueUser, nodePartition, schedulingMode);
+        computeUserLimit(application, clusterResource, queueUser,
+            nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
     
@@ -1081,7 +928,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
     
     CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
-      queueUser, this, application, required, queueResourceLimitsInfo);
+      queueUser, this, application, queueResourceLimitsInfo);
     
     application.setHeadroomProvider(headroomProvider);
 
@@ -1091,8 +938,13 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   @Lock(NoLock.class)
+  public int getNodeLocalityDelay() {
+    return nodeLocalityDelay;
+  }
+
+  @Lock(NoLock.class)
   private Resource computeUserLimit(FiCaSchedulerApp application,
-      Resource clusterResource, Resource required, User user,
+      Resource clusterResource, User user,
       String nodePartition, SchedulingMode schedulingMode) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -1106,6 +958,11 @@ public class LeafQueue extends AbstractCSQueue {
             queueCapacities.getAbsoluteCapacity(nodePartition),
             minimumAllocation);
 
+    // Assume we have required resource equals to minimumAllocation, this can
+    // make sure user limit can continuously increase till queueMaxResource
+    // reached.
+    Resource required = minimumAllocation;
+
     // Allow progress for queues with miniscule capacity
     queueCapacity =
         Resources.max(
@@ -1206,8 +1063,8 @@ public class LeafQueue extends AbstractCSQueue {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getUsed(),application.getCurrentReservation()),
-            limit)) {
+            Resources.subtract(user.getUsed(),
+                application.getCurrentReservation()), limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
@@ -1215,13 +1072,11 @@ public class LeafQueue extends AbstractCSQueue {
                 + user.getUsed() + " reserved: "
                 + application.getCurrentReservation() + " limit: " + limit);
           }
-          Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit);
-          // we can only acquire a new container if we unreserve first since we ignored the
-          // user limit. Choose the max of user limit or what was previously set by max
-          // capacity.
-          currentResoureLimits.setAmountNeededUnreserve(
-              Resources.max(resourceCalculator, clusterResource,
-                  currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve));
+          Resource amountNeededToUnreserve =
+              Resources.subtract(user.getUsed(nodePartition), limit);
+          // we can only acquire a new container if we unreserve first to
+          // respect user-limit
+          currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
           return true;
         }
       }
@@ -1235,476 +1090,6 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
-      Priority priority, Resource required) {
-    int requiredContainers = application.getTotalRequiredResources(priority);
-    int reservedContainers = application.getNumReservedContainers(priority);
-    int starvation = 0;
-    if (reservedContainers > 0) {
-      float nodeFactor = 
-          Resources.ratio(
-              resourceCalculator, required, getMaximumAllocation()
-              );
-      
-      // Use percentage of node required to bias against large containers...
-      // Protect against corner case where you need the whole node with
-      // Math.min(nodeFactor, minimumAllocationFactor)
-      starvation = 
-          (int)((application.getReReservations(priority) / (float)reservedContainers) * 
-                (1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor())))
-               );
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("needsContainers:" +
-            " app.#re-reserve=" + application.getReReservations(priority) + 
-            " reserved=" + reservedContainers + 
-            " nodeFactor=" + nodeFactor + 
-            " minAllocFactor=" + getMinimumAllocationFactor() +
-            " starvation=" + starvation);
-      }
-    }
-    return (((starvation + requiredContainers) - reservedContainers) > 0);
-  }
-
-  private CSAssignment assignContainersOnNode(Resource clusterResource,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-
-    CSAssignment assigned;
-
-    NodeType requestType = null;
-    MutableObject allocatedContainer = new MutableObject();
-    // Data-local
-    ResourceRequest nodeLocalResourceRequest =
-        application.getResourceRequest(priority, node.getNodeName());
-    if (nodeLocalResourceRequest != null) {
-      requestType = NodeType.NODE_LOCAL;
-      assigned =
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
-            node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.NODE_LOCAL);
-        return assigned;
-      }
-    }
-
-    // Rack-local
-    ResourceRequest rackLocalResourceRequest =
-        application.getResourceRequest(priority, node.getRackName());
-    if (rackLocalResourceRequest != null) {
-      if (!rackLocalResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-
-      if (requestType != NodeType.NODE_LOCAL) {
-        requestType = NodeType.RACK_LOCAL;
-      }
-
-      assigned = 
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
-            node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.RACK_LOCAL);
-        return assigned;
-      }
-    }
-    
-    // Off-switch
-    ResourceRequest offSwitchResourceRequest =
-        application.getResourceRequest(priority, ResourceRequest.ANY);
-    if (offSwitchResourceRequest != null) {
-      if (!offSwitchResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-      if (requestType != NodeType.NODE_LOCAL
-          && requestType != NodeType.RACK_LOCAL) {
-        requestType = NodeType.OFF_SWITCH;
-      }
-
-      assigned =
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, application, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-
-      // update locality statistics
-      if (allocatedContainer.getValue() != null) {
-        application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
-      }
-      assigned.setType(NodeType.OFF_SWITCH);
-      return assigned;
-    }
-    
-    return SKIP_ASSIGNMENT;
-  }
-
-  @Private
-  protected boolean findNodeToUnreserve(Resource clusterResource,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource minimumUnreservedResource) {
-    // need to unreserve some other container first
-    NodeId idToUnreserve =
-        application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
-            resourceCalculator, clusterResource);
-    if (idToUnreserve == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("checked to see if could unreserve for app but nothing "
-            + "reserved that matches for this app");
-      }
-      return false;
-    }
-    FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve);
-    if (nodeToUnreserve == null) {
-      LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
-      return false;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("unreserving for app: " + application.getApplicationId()
-        + " on nodeId: " + idToUnreserve
-        + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + minimumUnreservedResource);
-    }
-
-    // headroom
-    Resources.addTo(application.getHeadroom(), nodeToUnreserve
-        .getReservedContainer().getReservedResource());
-
-    // Make sure to not have completedContainers sort the queues here since
-    // we are already inside an iterator loop for the queues and this would
-    // cause an concurrent modification exception.
-    completedContainer(clusterResource, application, nodeToUnreserve,
-        nodeToUnreserve.getReservedContainer(),
-        SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve
-            .getReservedContainer().getContainerId(),
-            SchedulerUtils.UNRESERVED_CONTAINER),
-        RMContainerEventType.RELEASED, null, false);
-    return true;
-  }
-
-  private CSAssignment assignNodeLocalContainers(Resource clusterResource,
-      ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
-      FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority,
-          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-  }
-
-  private CSAssignment assignRackLocalContainers(Resource clusterResource,
-      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
-      FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority,
-          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
-  }
-
-  private CSAssignment assignOffSwitchContainers(Resource clusterResource,
-      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
-      FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, application, priority,
-          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-    
-    return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
-  }
-  
-  private int getActualNodeLocalityDelay() {
-    return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay());
-  }
-
-  boolean canAssign(FiCaSchedulerApp application, Priority priority, 
-      FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
-
-    // Clearly we need containers for this application...
-    if (type == NodeType.OFF_SWITCH) {
-      if (reservedContainer != null) {
-        return true;
-      }
-
-      // 'Delay' off-switch
-      ResourceRequest offSwitchRequest = 
-          application.getResourceRequest(priority, ResourceRequest.ANY);
-      long missedOpportunities = application.getSchedulingOpportunities(priority);
-      long requiredContainers = offSwitchRequest.getNumContainers(); 
-      
-      float localityWaitFactor = 
-        application.getLocalityWaitFactor(priority, 
-            scheduler.getNumClusterNodes());
-      
-      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
-    }
-
-    // Check if we need containers on this rack 
-    ResourceRequest rackLocalRequest = 
-      application.getResourceRequest(priority, node.getRackName());
-    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
-      return false;
-    }
-      
-    // If we are here, we do need containers on this rack for RACK_LOCAL req
-    if (type == NodeType.RACK_LOCAL) {
-      // 'Delay' rack-local just a little bit...
-      long missedOpportunities = application.getSchedulingOpportunities(priority);
-      return getActualNodeLocalityDelay() < missedOpportunities;
-    }
-
-    // Check if we need containers on this host
-    if (type == NodeType.NODE_LOCAL) {
-      // Now check if we need containers on this host...
-      ResourceRequest nodeLocalRequest = 
-        application.getResourceRequest(priority, node.getNodeName());
-      if (nodeLocalRequest != null) {
-        return nodeLocalRequest.getNumContainers() > 0;
-      }
-    }
-
-    return false;
-  }
-  
-  private Container getContainer(RMContainer rmContainer, 
-      FiCaSchedulerApp application, FiCaSchedulerNode node, 
-      Resource capability, Priority priority) {
-    return (rmContainer != null) ? rmContainer.getContainer() :
-      createContainer(application, node, capability, priority);
-  }
-
-  Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, 
-      Resource capability, Priority priority) {
-  
-    NodeId nodeId = node.getRMNode().getNodeID();
-    ContainerId containerId = BuilderUtils.newContainerId(application
-        .getApplicationAttemptId(), application.getNewContainerId());
-  
-    // Create the container
-    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-      .getHttpAddress(), capability, priority, null);
-
-  }
-
-
-  private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
-      FiCaSchedulerApp application, Priority priority, 
-      ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " application=" + application.getApplicationId()
-        + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type);
-    }
-    
-    // check if the resource request can access the label
-    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
-        node.getPartition(), schedulingMode)) {
-      // this is a reserved container, but we cannot allocate it now according
-      // to label not match. This can be caused by node label changed
-      // We should un-reserve this container.
-      if (rmContainer != null) {
-        unreserve(application, priority, node, rmContainer);
-      }
-      return new CSAssignment(Resources.none(), type);
-    }
-    
-    Resource capability = request.getCapability();
-    Resource available = node.getAvailableResource();
-    Resource totalResource = node.getTotalResource();
-
-    if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
-        capability, totalResource)) {
-      LOG.warn("Node : " + node.getNodeID()
-          + " does not have sufficient resource for request : " + request
-          + " node total capability : " + node.getTotalResource());
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    assert Resources.greaterThan(
-        resourceCalculator, clusterResource, available, Resources.none());
-
-    // Create the container if necessary
-    Container container = 
-        getContainer(rmContainer, application, node, capability, priority);
-  
-    // something went wrong getting/creating the container 
-    if (container == null) {
-      LOG.warn("Couldn't get container for allocation!");
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
-        application, priority, capability);
-
-    // Can we allocate a container on this node?
-    int availableContainers = 
-        resourceCalculator.computeAvailableContainers(available, capability);
-
-    boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
-        currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
-
-    if (availableContainers > 0) {
-      // Allocate...
-
-      // Did we previously reserve containers at this 'priority'?
-      if (rmContainer != null) {
-        unreserve(application, priority, node, rmContainer);
-      } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
-        // when reservationsContinueLooking is set, we may need to unreserve
-        // some containers to meet this queue, its parents', or the users' resource limits.
-        // TODO, need change here when we want to support continuous reservation
-        // looking for labeled partitions.
-        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
-          // If we shouldn't allocate/reserve new container then we should unreserve one the same
-          // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
-          // could be zero. If the limit was hit then use the amount we need to unreserve to be
-          // under the limit.
-          Resource amountToUnreserve = capability;
-          if (needToUnreserve) {
-            amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
-          }
-          boolean containerUnreserved =
-              findNodeToUnreserve(clusterResource, node, application, priority,
-                  amountToUnreserve);
-          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved 
-          // container (That means we *have to* unreserve some resource to
-          // continue)). If we failed to unreserve some resource, we can't continue.
-          if (!containerUnreserved) {
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-      }
-
-      // Inform the application
-      RMContainer allocatedContainer = 
-          application.allocate(type, node, priority, request, container);
-
-      // Does the application need this resource?
-      if (allocatedContainer == null) {
-        return new CSAssignment(Resources.none(), type);
-      }
-
-      // Inform the node
-      node.allocateContainer(allocatedContainer);
-            
-      // Inform the ordering policy
-      orderingPolicy.containerAllocated(application, allocatedContainer);
-
-      LOG.info("assignedContainer" +
-          " application attempt=" + application.getApplicationAttemptId() +
-          " container=" + container + 
-          " queue=" + this + 
-          " clusterResource=" + clusterResource);
-      createdContainer.setValue(allocatedContainer);
-      CSAssignment assignment = new CSAssignment(container.getResource(), type);
-      assignment.getAssignmentInformation().addAllocationDetails(
-        container.getId(), getQueuePath());
-      assignment.getAssignmentInformation().incrAllocations();
-      Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-        container.getResource());
-      return assignment;
-    } else {
-      // if we are allowed to allocate but this node doesn't have space, reserve it or
-      // if this was an already a reserved container, reserve it again
-      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
-
-        if (reservationsContinueLooking && rmContainer == null) {
-          // we could possibly ignoring queue capacity or user limits when
-          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
-          // one.
-          if (needToUnreserve) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("we needed to unreserve to be able to allocate");
-            }
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-
-        // Reserve by 'charging' in advance...
-        reserve(application, priority, node, rmContainer, container);
-
-        LOG.info("Reserved container " + 
-            " application=" + application.getApplicationId() + 
-            " resource=" + request.getCapability() + 
-            " queue=" + this.toString() + 
-            " usedCapacity=" + getUsedCapacity() + 
-            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + 
-            " used=" + queueUsage.getUsed() +
-            " cluster=" + clusterResource);
-        CSAssignment assignment =
-            new CSAssignment(request.getCapability(), type);
-        assignment.getAssignmentInformation().addReservationDetails(
-          container.getId(), getQueuePath());
-        assignment.getAssignmentInformation().incrReservations();
-        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-          request.getCapability());
-        return assignment;
-      }
-      return new CSAssignment(Resources.none(), type);
-    }
-  }
-
-  private void reserve(FiCaSchedulerApp application, Priority priority, 
-      FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
-    // Update reserved metrics if this is the first reservation
-    if (rmContainer == null) {
-      getMetrics().reserveResource(
-          application.getUser(), container.getResource());
-    }
-
-    // Inform the application 
-    rmContainer = application.reserve(node, priority, rmContainer, container);
-    
-    // Update the node
-    node.reserveResource(application, priority, rmContainer);
-  }
-
-  private boolean unreserve(FiCaSchedulerApp application, Priority priority,
-      FiCaSchedulerNode node, RMContainer rmContainer) {
-    // Done with the reservation?
-    if (application.unreserve(node, priority)) {
-      node.unreserveResource(application);
-
-      // Update reserved metrics
-      getMetrics().unreserveResource(application.getUser(),
-          rmContainer.getContainer().getResource());
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
@@ -1724,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
         // happen under scheduler's lock... 
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
-          removed = unreserve(application, rmContainer.getReservedPriority(),
+          removed = application.unreserve(rmContainer.getReservedPriority(),
               node, rmContainer);
         } else {
           removed =
@@ -1838,15 +1223,17 @@ public class LeafQueue extends AbstractCSQueue {
     // Even if ParentQueue will set limits respect child's max queue capacity,
     // but when allocating reserved container, CapacityScheduler doesn't do
     // this. So need cap limits by queue's max capacity here.
-    this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
+    this.cachedResourceLimitsForHeadroom =
+        new ResourceLimits(currentResourceLimits.getLimit());
     Resource queueMaxResource =
         Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
             .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
             queueCapacities
                 .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
             minimumAllocation);
-    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
-        clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
+        resourceCalculator, clusterResource, queueMaxResource,
+        currentResourceLimits.getLimit()));
   }
 
   @Override
@@ -1874,7 +1261,7 @@ public class LeafQueue extends AbstractCSQueue {
       orderingPolicy.getSchedulableEntities()) {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource,
-            Resources.none(), RMNodeLabelsManager.NO_LABEL,
+            RMNodeLabelsManager.NO_LABEL,
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 5807dd1..e54b9e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
   final PartitionedQueueComparator partitionQueueComparator;
   volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
+  private boolean needToResortQueuesAtNextAllocation = false;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -411,7 +412,7 @@ public class ParentQueue extends AbstractCSQueue {
       // This will also consider parent's limits and also continuous reservation
       // looking
       if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-          resourceLimits, minimumAllocation, Resources.createResource(
+          resourceLimits, Resources.createResource(
               getMetrics().getReservedMB(), getMetrics()
                   .getReservedVirtualCores()), schedulingMode)) {
         break;
@@ -527,6 +528,14 @@ public class ParentQueue extends AbstractCSQueue {
   
   private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
     if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+      if (needToResortQueuesAtNextAllocation) {
+        // If we skipped resort queues last time, we need to re-sort queue
+        // before allocation
+        List<CSQueue> childrenList = new ArrayList<>(childQueues);
+        childQueues.clear();
+        childQueues.addAll(childrenList);
+        needToResortQueuesAtNextAllocation = false;
+      }
       return childQueues.iterator();
     }
 
@@ -644,6 +653,11 @@ public class ParentQueue extends AbstractCSQueue {
             }
           }
         }
+        
+        // If we skipped sort queue this time, we need to resort queues to make
+        // sure we allocate from least usage (or order defined by queue policy)
+        // queues.
+        needToResortQueuesAtNextAllocation = !sortQueues;
       }
 
       // Inform the parent


Mime
View raw message