hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [13/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
Date Fri, 11 Nov 2016 00:10:40 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index a69af6e..fd0c68b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -47,8 +47,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.IOException;
@@ -71,12 +76,10 @@ public class ParentQueue extends AbstractCSQueue {
 
   protected final Set<CSQueue> childQueues;  
   private final boolean rootQueue;
-  final Comparator<CSQueue> nonPartitionedQueueComparator;
-  final PartitionedQueueComparator partitionQueueComparator;
-  volatile int numApplications;
+  private final Comparator<CSQueue> nonPartitionedQueueComparator;
+  private final PartitionedQueueComparator partitionQueueComparator;
+  private volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
-  private boolean needToResortQueuesAtNextAllocation = false;
-  private int offswitchPerHeartbeatLimit;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -86,7 +89,7 @@ public class ParentQueue extends AbstractCSQueue {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
     this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
-    this.partitionQueueComparator = cs.getPartitionedQueueComparator();
+    this.partitionQueueComparator = new PartitionedQueueComparator();
 
     this.rootQueue = (parent == null);
 
@@ -126,16 +129,12 @@ public class ParentQueue extends AbstractCSQueue {
         }
       }
 
-      offswitchPerHeartbeatLimit =
-        csContext.getConfiguration().getOffSwitchPerHeartbeatLimit();
-
       LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
           + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
           + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
           + ", absoluteMaxCapacity=" + this.queueCapacities
           .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
           + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
-          + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit()
           + ", reservationsContinueLooking=" + reservationsContinueLooking);
     } finally {
       writeLock.unlock();
@@ -215,11 +214,6 @@ public class ParentQueue extends AbstractCSQueue {
 
   }
 
-  @Private
-  public int getOffSwitchPerHeartbeatLimit() {
-    return offswitchPerHeartbeatLimit;
-  }
-
   private QueueUserACLInfo getUserAclInfo(
       UserGroupInformation user) {
     try {
@@ -435,156 +429,145 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits,
-      SchedulingMode schedulingMode) {
-    int offswitchCount = 0;
-    try {
-      writeLock.lock();
-      // if our queue cannot access this node, just return
-      if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-          && !accessibleToPartition(node.getPartition())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip this queue=" + getQueuePath()
-              + ", because it is not able to access partition=" + node
+      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
+    SchedulingMode schedulingMode) {
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+
+    // if our queue cannot access this node, just return
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(ps.getPartition())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it is not able to access partition=" + ps
+            .getPartition());
+      }
+
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
               .getPartition());
-        }
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
 
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParentName(), getQueueName(), ActivityState.REJECTED,
-            ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
-                .getPartition());
-        if (rootQueue) {
-          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-              node);
-        }
+      return CSAssignment.NULL_ASSIGNMENT;
+    }
 
-        return CSAssignment.NULL_ASSIGNMENT;
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource,
+        schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + ps
+            .getPartition());
       }
 
-      // Check if this queue need more resource, simply skip allocation if this
-      // queue doesn't need more resources.
-      if (!super.hasPendingResourceRequest(node.getPartition(), clusterResource,
-          schedulingMode)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip this queue=" + getQueuePath()
-              + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-partition=" + node
-              .getPartition());
-        }
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
+
+      return CSAssignment.NULL_ASSIGNMENT;
+    }
+
+    CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0),
+        NodeType.NODE_LOCAL);
+
+    while (canAssign(clusterResource, node)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to assign containers to child-queue of "
+            + getQueueName());
+      }
+
+      // Are we over maximum-capacity for this queue?
+      // This will also consider parent's limits and also continuous reservation
+      // looking
+      if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
+          resourceLimits, Resources
+              .createResource(getMetrics().getReservedMB(),
+                  getMetrics().getReservedVirtualCores()), schedulingMode)) {
 
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
             getParentName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
         if (rootQueue) {
           ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
               node);
         }
 
-        return CSAssignment.NULL_ASSIGNMENT;
+        break;
       }
 
-      CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0),
-          NodeType.NODE_LOCAL);
+      // Schedule
+      CSAssignment assignedToChild = assignContainersToChildQueues(
+          clusterResource, ps, resourceLimits, schedulingMode);
+      assignment.setType(assignedToChild.getType());
+      assignment.setRequestLocalityType(
+          assignedToChild.getRequestLocalityType());
+      assignment.setExcessReservation(assignedToChild.getExcessReservation());
+      assignment.setContainersToKill(assignedToChild.getContainersToKill());
 
-      while (canAssign(clusterResource, node)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to assign containers to child-queue of "
-              + getQueueName());
-        }
+      // Done if no child-queue assigned anything
+      if (Resources.greaterThan(resourceCalculator, clusterResource,
+          assignedToChild.getResource(), Resources.none())) {
 
-        // Are we over maximum-capacity for this queue?
-        // This will also consider parent's limits and also continuous reservation
-        // looking
-        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-            resourceLimits, Resources
-                .createResource(getMetrics().getReservedMB(),
-                    getMetrics().getReservedVirtualCores()), schedulingMode)) {
-
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParentName(), getQueueName(), ActivityState.SKIPPED,
-              ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
+        boolean isReserved =
+            assignedToChild.getAssignmentInformation().getReservationDetails()
+                != null && !assignedToChild.getAssignmentInformation()
+                .getReservationDetails().isEmpty();
+        if (node != null && !isReserved) {
           if (rootQueue) {
-            ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-                node);
-          }
-
-          break;
-        }
-
-        // Schedule
-        CSAssignment assignedToChild = assignContainersToChildQueues(
-            clusterResource, node, resourceLimits, schedulingMode);
-        assignment.setType(assignedToChild.getType());
-
-        // Done if no child-queue assigned anything
-        if (Resources.greaterThan(resourceCalculator, clusterResource,
-            assignedToChild.getResource(), Resources.none())) {
-
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParentName(), getQueueName(), ActivityState.ACCEPTED,
-              ActivityDiagnosticConstant.EMPTY);
-
-          if (node.getReservedContainer() == null) {
-            if (rootQueue) {
-              ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
-                  activitiesManager, node,
-                  assignedToChild.getAssignmentInformation()
-                      .getFirstAllocatedOrReservedContainerId(),
-                  AllocationState.ALLOCATED);
-            }
-          } else{
-            if (rootQueue) {
-              ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
-                  activitiesManager, node,
-                  assignedToChild.getAssignmentInformation()
-                      .getFirstAllocatedOrReservedContainerId(),
-                  AllocationState.RESERVED);
-            }
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.ALLOCATED);
           }
-
-          // Track resource utilization for the parent-queue
-          allocateResource(clusterResource, assignedToChild.getResource(),
-              node.getPartition(), assignedToChild.isIncreasedAllocation());
-
-          // Track resource utilization in this pass of the scheduler
-          Resources.addTo(assignment.getResource(),
-              assignedToChild.getResource());
-          Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-              assignedToChild.getAssignmentInformation().getAllocated());
-          Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-              assignedToChild.getAssignmentInformation().getReserved());
-          assignment.getAssignmentInformation().incrAllocations(
-              assignedToChild.getAssignmentInformation().getNumAllocations());
-          assignment.getAssignmentInformation().incrReservations(
-              assignedToChild.getAssignmentInformation().getNumReservations());
-          assignment.getAssignmentInformation().getAllocationDetails().addAll(
-              assignedToChild.getAssignmentInformation()
-                  .getAllocationDetails());
-          assignment.getAssignmentInformation().getReservationDetails().addAll(
-              assignedToChild.getAssignmentInformation()
-                  .getReservationDetails());
-          assignment.setIncreasedAllocation(
-              assignedToChild.isIncreasedAllocation());
-
-          LOG.info("assignedContainer" + " queue=" + getQueueName()
-              + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
-              + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
-              + " cluster=" + clusterResource);
-
         } else{
-          assignment.setSkippedType(assignedToChild.getSkippedType());
-
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParentName(), getQueueName(), ActivityState.SKIPPED,
-              ActivityDiagnosticConstant.EMPTY);
           if (rootQueue) {
-            ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-                node);
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.RESERVED);
           }
-
-          break;
         }
 
+        // Track resource utilization in this pass of the scheduler
+        Resources.addTo(assignment.getResource(),
+            assignedToChild.getResource());
+        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+            assignedToChild.getAssignmentInformation().getAllocated());
+        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
+            assignedToChild.getAssignmentInformation().getReserved());
+        assignment.getAssignmentInformation().incrAllocations(
+            assignedToChild.getAssignmentInformation().getNumAllocations());
+        assignment.getAssignmentInformation().incrReservations(
+            assignedToChild.getAssignmentInformation().getNumReservations());
+        assignment.getAssignmentInformation().getAllocationDetails().addAll(
+            assignedToChild.getAssignmentInformation()
+                .getAllocationDetails());
+        assignment.getAssignmentInformation().getReservationDetails().addAll(
+            assignedToChild.getAssignmentInformation()
+                .getReservationDetails());
+        assignment.setIncreasedAllocation(
+            assignedToChild.isIncreasedAllocation());
+
+        LOG.info("assignedContainer" + " queue=" + getQueueName()
+            + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+            + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
+            + " cluster=" + clusterResource);
+
         if (LOG.isDebugEnabled()) {
           LOG.debug(
               "ParentQ=" + getQueueName() + " assignedSoFarInThisIteration="
@@ -592,39 +575,47 @@ public class ParentQueue extends AbstractCSQueue {
                   + getUsedCapacity() + " absoluteUsedCapacity="
                   + getAbsoluteUsedCapacity());
         }
+      } else{
+        assignment.setSkippedType(assignedToChild.getSkippedType());
 
-        if (assignment.getType() == NodeType.OFF_SWITCH) {
-          offswitchCount++;
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
         }
 
-        // Do not assign more containers if this isn't the root queue
-        // or if we've already assigned enough OFF_SWITCH containers in
-        // this pass
-        if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
-          if (LOG.isDebugEnabled()) {
-            if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
-              LOG.debug("Assigned maximum number of off-switch containers: " +
-                  offswitchCount + ", assignments so far: " + assignment);
-            }
-          }
-          break;
-        }
+        break;
       }
 
-      return assignment;
-    } finally {
-      writeLock.unlock();
+      /*
+       * Previously here, we can allocate more than one container for each
+       * allocation under rootQ. Now this logic is not proper any more
+       * in global scheduling world.
+       *
+       * So here do not try to allocate more than one container for each
+       * allocation, let top scheduler make the decision.
+       */
+      break;
     }
+
+    return assignment;
   }
 
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
+    // When node == null means global scheduling is enabled, always return true
+    if (null == node) {
+      return true;
+    }
+
     // Two conditions need to meet when trying to allocate:
     // 1) Node doesn't have reserved container
     // 2) Node's available-resource + killable-resource should > 0
     return node.getReservedContainer() == null && Resources.greaterThanOrEqual(
         resourceCalculator, clusterResource, Resources
-            .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
-        minimumAllocation);
+            .add(node.getUnallocatedResource(),
+                node.getTotalKillableResources()), minimumAllocation);
   }
 
   private ResourceLimits getResourceLimitsOfChild(CSQueue child,
@@ -662,28 +653,20 @@ public class ParentQueue extends AbstractCSQueue {
 
     return new ResourceLimits(childLimit);
   }
-  
-  private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
-    if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
-      if (needToResortQueuesAtNextAllocation) {
-        // If we skipped resort queues last time, we need to re-sort queue
-        // before allocation
-        List<CSQueue> childrenList = new ArrayList<>(childQueues);
-        childQueues.clear();
-        childQueues.addAll(childrenList);
-        needToResortQueuesAtNextAllocation = false;
-      }
-      return childQueues.iterator();
-    }
 
-    partitionQueueComparator.setPartitionToLookAt(node.getPartition());
+  private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
+      String partition) {
+    // Previously we keep a sorted list for default partition, it is not good
+    // when multi-threading scheduler is enabled, so to make a simpler code
+    // now re-sort queue every time irrespective to node partition.
+    partitionQueueComparator.setPartitionToLookAt(partition);
     List<CSQueue> childrenList = new ArrayList<>(childQueues);
     Collections.sort(childrenList, partitionQueueComparator);
     return childrenList.iterator();
   }
-  
-  private CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
+
+  private CSAssignment assignContainersToChildQueues(Resource cluster,
+      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits,
       SchedulingMode schedulingMode) {
     CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
 
@@ -691,8 +674,8 @@ public class ParentQueue extends AbstractCSQueue {
     printChildQueues();
 
     // Try to assign to most 'under-served' sub-queue
-    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter
-        .hasNext();) {
+    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
+        ps.getPartition()); iter.hasNext(); ) {
       CSQueue childQueue = iter.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@@ -702,9 +685,9 @@ public class ParentQueue extends AbstractCSQueue {
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, parentLimits,
-              node.getPartition());
+              ps.getPartition());
       
-      CSAssignment childAssignment = childQueue.assignContainers(cluster, node,
+      CSAssignment childAssignment = childQueue.assignContainers(cluster, ps,
           childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
@@ -712,22 +695,9 @@ public class ParentQueue extends AbstractCSQueue {
             childAssignment.getResource() + ", " + childAssignment.getType());
       }
 
-      // If we do assign, remove the queue and re-insert in-order to re-sort
       if (Resources.greaterThan(
               resourceCalculator, cluster, 
               childAssignment.getResource(), Resources.none())) {
-        // Only update childQueues when we doing non-partitioned node
-        // allocation.
-        if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
-          // Remove and re-insert to sort
-          iter.remove();
-          LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath()
-              + " stats: " + childQueue);
-          childQueues.add(childQueue);
-          if (LOG.isDebugEnabled()) {
-            printChildQueues();
-          }
-        }
         assignment = childAssignment;
         break;
       } else if (childAssignment.getSkippedType() ==
@@ -770,10 +740,10 @@ public class ParentQueue extends AbstractCSQueue {
         + " child-queues: " + getChildQueuesToPrint());
     }
   }
-  
+
   private void internalReleaseResource(Resource clusterResource,
-      FiCaSchedulerNode node, Resource releasedResource, boolean changeResource,
-      CSQueue completedChildQueue, boolean sortQueues) {
+      FiCaSchedulerNode node, Resource releasedResource,
+      boolean changeResource) {
     try {
       writeLock.lock();
       super.releaseResource(clusterResource, releasedResource,
@@ -784,29 +754,6 @@ public class ParentQueue extends AbstractCSQueue {
             "completedContainer " + this + ", cluster=" + clusterResource);
       }
 
-      // Note that this is using an iterator on the childQueues so this can't
-      // be called if already within an iterator for the childQueues. Like
-      // from assignContainersToChildQueues.
-      if (sortQueues) {
-        // reinsert the updated queue
-        for (Iterator<CSQueue> iter = childQueues.iterator();
-             iter.hasNext(); ) {
-          CSQueue csqueue = iter.next();
-          if (csqueue.equals(completedChildQueue)) {
-            iter.remove();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Re-sorting completed queue: " + csqueue);
-            }
-            childQueues.add(csqueue);
-            break;
-          }
-        }
-      }
-
-      // If we skipped sort queue this time, we need to resort queues to make
-      // sure we allocate from least usage (or order defined by queue policy)
-      // queues.
-      needToResortQueuesAtNextAllocation = !sortQueues;
     } finally {
       writeLock.unlock();
     }
@@ -821,8 +768,7 @@ public class ParentQueue extends AbstractCSQueue {
         Resources.negate(decreaseRequest.getDeltaCapacity());
 
     internalReleaseResource(clusterResource,
-        csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false,
-        null, false);
+        csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false);
 
     // Inform the parent
     if (parent != null) {
@@ -835,7 +781,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
     if (app != null) {
       internalReleaseResource(clusterResource, node,
-          rmContainer.getReservedResource(), false, null, false);
+          rmContainer.getReservedResource(), false);
 
       // Inform the parent
       if (parent != null) {
@@ -853,8 +799,7 @@ public class ParentQueue extends AbstractCSQueue {
       boolean sortQueues) {
     if (application != null) {
       internalReleaseResource(clusterResource, node,
-          rmContainer.getContainer().getResource(), false, completedChildQueue,
-          sortQueues);
+          rmContainer.getContainer().getResource(), false);
 
       // Inform the parent
       if (parent != null) {
@@ -1062,4 +1007,37 @@ public class ParentQueue extends AbstractCSQueue {
       }
     }
   }
+
+  public void apply(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    if (request.anythingAllocatedOrReserved()) {
+      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
+          allocation = request.getFirstAllocatedOrReservedContainer();
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+          schedulerContainer = allocation.getAllocatedOrReservedContainer();
+
+      // Do not modify queue when allocation from reserved container
+      if (allocation.getAllocateFromReservedContainer() == null) {
+        try {
+          writeLock.lock();
+          // Book-keeping
+          // Note: Update headroom to account for current allocation too...
+          allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
+              schedulerContainer.getNodePartition(),
+              allocation.isIncreasedAllocation());
+
+          LOG.info("assignedContainer" + " queue=" + getQueueName()
+              + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+              + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
+              + " cluster=" + cluster);
+        } finally {
+          writeLock.unlock();
+        }
+      }
+    }
+
+    if (parent != null) {
+      parent.apply(cluster, request);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index fa13df4..5bb91e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -77,11 +78,13 @@ public abstract class AbstractContainerAllocator {
     // Handle excess reservation
     assignment.setExcessReservation(result.getContainerToBeUnreserved());
 
+    assignment.setRequestLocalityType(result.requestLocalityType);
+
     // If we allocated something
     if (Resources.greaterThan(rc, clusterResource,
         result.getResourceToBeAllocated(), Resources.none())) {
       Resource allocatedResource = result.getResourceToBeAllocated();
-      Container updatedContainer = result.getUpdatedContainer();
+      RMContainer updatedContainer = result.getUpdatedContainer();
 
       assignment.setResource(allocatedResource);
       assignment.setType(result.getContainerNodeType());
@@ -92,8 +95,7 @@ public abstract class AbstractContainerAllocator {
             + application.getApplicationId() + " resource=" + allocatedResource
             + " queue=" + this.toString() + " cluster=" + clusterResource);
         assignment.getAssignmentInformation().addReservationDetails(
-            updatedContainer.getId(),
-            application.getCSLeafQueue().getQueuePath());
+            updatedContainer, application.getCSLeafQueue().getQueuePath());
         assignment.getAssignmentInformation().incrReservations();
         Resources.addTo(assignment.getAssignmentInformation().getReserved(),
             allocatedResource);
@@ -111,41 +113,37 @@ public abstract class AbstractContainerAllocator {
               ActivityState.RESERVED);
           ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
               activitiesManager, application.getApplicationId(),
-              updatedContainer.getId(), ActivityState.RESERVED,
+              updatedContainer.getContainerId(), ActivityState.RESERVED,
               ActivityDiagnosticConstant.EMPTY);
         }
       } else if (result.getAllocationState() == AllocationState.ALLOCATED){
         // This is a new container
         // Inform the ordering policy
-        LOG.info("assignedContainer" + " application attempt="
-            + application.getApplicationAttemptId() + " container="
-            + updatedContainer.getId() + " queue=" + this + " clusterResource="
+        LOG.info("assignedContainer" + " application attempt=" + application
+            .getApplicationAttemptId() + " container=" + updatedContainer
+            .getContainerId() + " queue=" + this + " clusterResource="
             + clusterResource + " type=" + assignment.getType());
 
-        application
-            .getCSLeafQueue()
-            .getOrderingPolicy()
-            .containerAllocated(application,
-                application.getRMContainer(updatedContainer.getId()));
-
         assignment.getAssignmentInformation().addAllocationDetails(
-            updatedContainer.getId(),
-            application.getCSLeafQueue().getQueuePath());
+            updatedContainer, application.getCSLeafQueue().getQueuePath());
         assignment.getAssignmentInformation().incrAllocations();
         Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
             allocatedResource);
 
         if (rmContainer != null) {
           assignment.setFulfilledReservation(true);
+          assignment.setFulfilledReservedContainer(rmContainer);
         }
 
         ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
             node, application, updatedContainer, ActivityState.ALLOCATED);
         ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
             activitiesManager, application.getApplicationId(),
-            updatedContainer.getId(), ActivityState.ACCEPTED,
+            updatedContainer.getContainerId(), ActivityState.ACCEPTED,
             ActivityDiagnosticConstant.EMPTY);
 
+        // Update unformed resource
+        application.incUnconfirmedRes(allocatedResource);
       }
 
       assignment.setContainersToKill(result.getToKillContainers());
@@ -170,8 +168,15 @@ public abstract class AbstractContainerAllocator {
    * <li>Do allocation: this will decide/create allocated/reserved
    * container, this will also update metrics</li>
    * </ul>
+   *
+   * @param clusterResource clusterResource
+   * @param ps PlacementSet
+   * @param schedulingMode scheduling mode (exclusive or nonexclusive)
+   * @param resourceLimits resourceLimits
+   * @param reservedContainer reservedContainer
+   * @return CSAssignemnt proposal
    */
   public abstract CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, RMContainer reservedContainer);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
index 8f749f6..f408508 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.List;
@@ -57,8 +58,13 @@ public class ContainerAllocation {
   private Resource resourceToBeAllocated = Resources.none();
   AllocationState state;
   NodeType containerNodeType = NodeType.NODE_LOCAL;
-  NodeType requestNodeType = NodeType.NODE_LOCAL;
-  Container updatedContainer;
+  NodeType requestLocalityType = null;
+
+  /**
+   * When some (new) container allocated/reserved or some increase container
+   * request allocated/reserved, updatedContainer will be set.
+   */
+  RMContainer updatedContainer;
   private List<RMContainer> toKillContainers;
 
   public ContainerAllocation(RMContainer containerToBeUnreserved,
@@ -87,7 +93,7 @@ public class ContainerAllocation {
     return containerNodeType;
   }
 
-  public Container getUpdatedContainer() {
+  public RMContainer getUpdatedContainer() {
     return updatedContainer;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 4eaa24b..57188d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerAllocator extends AbstractContainerAllocator {
-  AbstractContainerAllocator increaseContainerAllocator;
-  AbstractContainerAllocator regularContainerAllocator;
+  private AbstractContainerAllocator increaseContainerAllocator;
+  private AbstractContainerAllocator regularContainerAllocator;
 
   public ContainerAllocator(FiCaSchedulerApp application,
       ResourceCalculator rc, RMContext rmContext) {
@@ -52,17 +53,17 @@ public class ContainerAllocator extends AbstractContainerAllocator {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, RMContainer reservedContainer) {
     if (reservedContainer != null) {
       if (reservedContainer.getState() == RMContainerState.RESERVED) {
         // It's a regular container
         return regularContainerAllocator.assignContainers(clusterResource,
-            node, schedulingMode, resourceLimits, reservedContainer);
+            ps, schedulingMode, resourceLimits, reservedContainer);
       } else {
         // It's a increase container
         return increaseContainerAllocator.assignContainers(clusterResource,
-            node, schedulingMode, resourceLimits, reservedContainer);
+            ps, schedulingMode, resourceLimits, reservedContainer);
       }
     } else {
       /*
@@ -70,14 +71,14 @@ public class ContainerAllocator extends AbstractContainerAllocator {
        * anything, we will try to allocate regular container
        */
       CSAssignment assign =
-          increaseContainerAllocator.assignContainers(clusterResource, node,
+          increaseContainerAllocator.assignContainers(clusterResource, ps,
               schedulingMode, resourceLimits, null);
       if (Resources.greaterThan(rc, clusterResource, assign.getResource(),
           Resources.none())) {
         return assign;
       }
 
-      return regularContainerAllocator.assignContainers(clusterResource, node,
+      return regularContainerAllocator.assignContainers(clusterResource, ps,
           schedulingMode, resourceLimits, null);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
index 509dfba..74a64c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -36,16 +30,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
 public class IncreaseContainerAllocator extends AbstractContainerAllocator {
   private static final Log LOG =
       LogFactory.getLog(IncreaseContainerAllocator.class);
@@ -76,7 +75,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
         request.getDeltaCapacity());
     assignment.getAssignmentInformation().incrReservations();
     assignment.getAssignmentInformation().addReservationDetails(
-        request.getContainerId(), application.getCSLeafQueue().getQueuePath());
+        request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
     assignment.setIncreasedAllocation(true);
     
     LOG.info("Reserved increase container request:" + request.toString());
@@ -93,8 +92,12 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
         request.getDeltaCapacity());
     assignment.getAssignmentInformation().incrAllocations();
     assignment.getAssignmentInformation().addAllocationDetails(
-        request.getContainerId(), application.getCSLeafQueue().getQueuePath());
+        request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
     assignment.setIncreasedAllocation(true);
+
+    if (fromReservation) {
+      assignment.setFulfilledReservedContainer(request.getRMContainer());
+    }
     
     // notify application
     application
@@ -114,19 +117,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
       SchedContainerChangeRequest increaseRequest) {
     if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
         node.getUnallocatedResource())) {
-      // OK, we can allocate this increase request
-      // Unreserve it first
-      application.unreserve(
-          increaseRequest.getRMContainer().getAllocatedSchedulerKey(),
-          (FiCaSchedulerNode) node, increaseRequest.getRMContainer());
-      
-      // Notify application
-      application.increaseContainer(increaseRequest);
-      
-      // Notify node
-      node.increaseContainer(increaseRequest.getContainerId(),
-          increaseRequest.getDeltaCapacity());
-
       return createSuccessfullyIncreasedCSAssignment(increaseRequest, true);
     } else {
       if (LOG.isDebugEnabled()) {
@@ -144,40 +134,26 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
       Resource cluster, SchedContainerChangeRequest increaseRequest) {
     if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
         node.getUnallocatedResource())) {
-      // Notify node
-      node.increaseContainer(increaseRequest.getContainerId(),
-          increaseRequest.getDeltaCapacity());
-
-      // OK, we can allocate this increase request
-      // Notify application
-      application.increaseContainer(increaseRequest);
       return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
-    } else {
-      boolean reservationSucceeded =
-          application.reserveIncreasedContainer(
-              increaseRequest.getRMContainer().getAllocatedSchedulerKey(),
-              node, increaseRequest.getRMContainer(),
-              increaseRequest.getDeltaCapacity());
-      
-      if (reservationSucceeded) {
-        // We cannot allocate this container, but since queue capacity /
-        // user-limit matches, we can reserve this container on this node.
-        return createReservedIncreasedCSAssignment(increaseRequest);
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Reserve increase request=" + increaseRequest.toString()
-              + " failed. Skipping..");
-        }
-        return CSAssignment.SKIP_ASSIGNMENT;
-      }
+    } else{
+      // We cannot allocate this container, but since queue capacity /
+      // user-limit matches, we can reserve this container on this node.
+      return createReservedIncreasedCSAssignment(increaseRequest);
     }
   }
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, RMContainer reservedContainer) {
     AppSchedulingInfo sinfo = application.getAppSchedulingInfo();
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+
+    if (null == node) {
+      // This is global scheduling enabled
+      // FIXME, support container increase when global scheduling enabled
+      return CSAssignment.SKIP_ASSIGNMENT;
+    }
     NodeId nodeId = node.getNodeID();
 
     if (reservedContainer == null) {
@@ -258,8 +234,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
         }
         Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter =
             increaseRequestMap.entrySet().iterator();
-        List<SchedContainerChangeRequest> toBeRemovedRequests =
-            new ArrayList<>();
 
         while (iter.hasNext()) {
           Entry<ContainerId, SchedContainerChangeRequest> entry =
@@ -289,7 +263,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
             if (LOG.isDebugEnabled()) {
               LOG.debug("  Container is not running any more, skip...");
             }
-            toBeRemovedRequests.add(increaseRequest);
+            application.addToBeRemovedIncreaseRequest(increaseRequest);
             continue;
           }
 
@@ -304,7 +278,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
               LOG.debug("  Target capacity is more than what node can offer,"
                   + " node.resource=" + node.getTotalResource());
             }
-            toBeRemovedRequests.add(increaseRequest);
+            application.addToBeRemovedIncreaseRequest(increaseRequest);
             continue;
           }
 
@@ -318,15 +292,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
             break;
           }
         }
-        
-        // Remove invalid in request requests
-        if (!toBeRemovedRequests.isEmpty()) {
-          for (SchedContainerChangeRequest req : toBeRemovedRequests) {
-            sinfo.removeIncreaseRequest(req.getNodeId(),
-                req.getRMContainer().getAllocatedSchedulerKey(),
-                req.getContainerId());
-          }
-        }
 
         // We may have allocated something
         if (assigned != null && assigned.getSkippedType()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1a3f71f..3e8282f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@@ -50,6 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -71,12 +75,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   private boolean checkHeadroom(Resource clusterResource,
       ResourceLimits currentResourceLimits, Resource required,
-      FiCaSchedulerNode node) {
+      String nodePartition) {
     // If headroom + currentReservation < required, we cannot allocate this
     // require
     Resource resourceCouldBeUnReserved = application.getCurrentReservation();
     if (!application.getCSLeafQueue().getReservationContinueLooking()
-        || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+        || !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
       // If we don't allow reservation continuous looking, OR we're looking at
       // non-default node partition, we won't allow to unreserve before
       // allocation.
@@ -87,20 +91,17 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         required);
   }
 
-  
-  private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+  /*
+   * Pre-check if we can allocate a pending resource request
+   * (given schedulerKey) to a given PlacementSet.
+   * We will consider stuffs like exclusivity, pending resource, node partition,
+   * headroom, etc.
+   */
+  private ContainerAllocation preCheckForPlacementSet(Resource clusterResource,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
     Priority priority = schedulerKey.getPriority();
-
-    if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
-      application.updateAppSkipNodeDiagnostics(
-          CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
-      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-          activitiesManager, node, application, priority,
-          ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
-      return ContainerAllocation.APP_SKIPPED;
-    }
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
 
     ResourceRequest anyRequest =
         application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
@@ -144,7 +145,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // matches the node's label?
     // If not match, jump to next priority.
     if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-        anyRequest.getNodeLabelExpression(), node.getPartition(),
+        anyRequest.getNodeLabelExpression(), ps.getPartition(),
         schedulingMode)) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,
@@ -165,7 +166,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     }
 
-    if (!checkHeadroom(clusterResource, resourceLimits, required, node)) {
+    if (!checkHeadroom(clusterResource, resourceLimits, required,
+        ps.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
@@ -176,9 +178,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       return ContainerAllocation.QUEUE_SKIPPED;
     }
 
-    // Inform the application it is about to get a scheduling opportunity
-    application.addSchedulingOpportunity(schedulerKey);
-
     // Increase missed-non-partitioned-resource-request-opportunity.
     // This is to make sure non-partitioned-resource-request will prefer
     // to be allocated to non-partitioned nodes
@@ -210,32 +209,43 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         return ContainerAllocation.APP_SKIPPED;
       }
     }
-    
+
     return null;
   }
 
-  ContainerAllocation preAllocation(Resource clusterResource,
+  private ContainerAllocation checkIfNodeBlackListed(FiCaSchedulerNode node,
+      SchedulerRequestKey schedulerKey) {
+    Priority priority = schedulerKey.getPriority();
+
+    if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
+      application.updateAppSkipNodeDiagnostics(
+          CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
+      return ContainerAllocation.APP_SKIPPED;
+    }
+
+    return null;
+  }
+
+  ContainerAllocation tryAllocateOnNode(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer) {
     ContainerAllocation result;
-    if (null == reservedContainer) {
-      // pre-check when allocating new container
-      result =
-          preCheckForNewContainer(clusterResource, node, schedulingMode,
-              resourceLimits, schedulerKey);
-      if (null != result) {
-        return result;
-      }
-    } else {
-      // pre-check when allocating reserved container
-      if (application.getTotalRequiredResources(schedulerKey) == 0) {
-        // Release
-        return new ContainerAllocation(reservedContainer, null,
-            AllocationState.QUEUE_SKIPPED);
-      }
+
+    // Sanity checks before assigning to this node
+    result = checkIfNodeBlackListed(node, schedulerKey);
+    if (null != result) {
+      return result;
     }
 
+    // Inform the application it is about to get a scheduling opportunity
+    // TODO, we may need to revisit here to see if we should add scheduling
+    // opportunity here
+    application.addSchedulingOpportunity(schedulerKey);
+
     // Try to allocate containers on node
     result =
         assignContainersOnNode(clusterResource, node, schedulerKey,
@@ -383,20 +393,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     Priority priority = schedulerKey.getPriority();
 
     ContainerAllocation allocation;
+    NodeType requestLocalityType = null;
 
-    NodeType requestType = null;
     // Data-local
     ResourceRequest nodeLocalResourceRequest =
         application.getResourceRequest(schedulerKey, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
-      requestType = NodeType.NODE_LOCAL;
+      requestLocalityType = NodeType.NODE_LOCAL;
       allocation =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
               node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
           allocation.getResourceToBeAllocated(), Resources.none())) {
-        allocation.requestNodeType = requestType;
+        allocation.requestLocalityType = requestLocalityType;
         return allocation;
       }
     }
@@ -412,9 +422,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
 
-      if (requestType != NodeType.NODE_LOCAL) {
-        requestType = NodeType.RACK_LOCAL;
-      }
+      requestLocalityType = requestLocalityType == null ?
+          NodeType.RACK_LOCAL :
+          requestLocalityType;
 
       allocation =
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
@@ -422,7 +432,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
           allocation.getResourceToBeAllocated(), Resources.none())) {
-        allocation.requestNodeType = requestType;
+        allocation.requestLocalityType = requestLocalityType;
         return allocation;
       }
     }
@@ -437,22 +447,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
             ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
-      if (requestType != NodeType.NODE_LOCAL
-          && requestType != NodeType.RACK_LOCAL) {
-        requestType = NodeType.OFF_SWITCH;
-      }
+
+      requestLocalityType = requestLocalityType == null ?
+          NodeType.OFF_SWITCH :
+          requestLocalityType;
 
       allocation =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
               node, schedulerKey, reservedContainer, schedulingMode,
               currentResoureLimits);
-      allocation.requestNodeType = requestType;
-      
+
       // When a returned allocation is LOCALITY_SKIPPED, since we're in
       // off-switch request now, we will skip this app w.r.t priorities 
       if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
         allocation.state = AllocationState.APP_SKIPPED;
       }
+      allocation.requestLocalityType = requestLocalityType;
 
       return allocation;
     }
@@ -671,33 +681,27 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   private Container createContainer(FiCaSchedulerNode node, Resource capability,
       SchedulerRequestKey schedulerKey) {
-
     NodeId nodeId = node.getRMNode().getNodeID();
-    ContainerId containerId =
-        BuilderUtils.newContainerId(application.getApplicationAttemptId(),
-            application.getNewContainerId());
 
     // Create the container
-    return BuilderUtils.newContainer(containerId, nodeId,
+    // Now set the containerId to null first, because it is possible the
+    // container will be rejected because of concurrent resource allocation.
+    // new containerId will be generated and assigned to the container
+    // after confirmed.
+    return BuilderUtils.newContainer(null, nodeId,
         node.getRMNode().getHttpAddress(), capability,
         schedulerKey.getPriority(), null,
         schedulerKey.getAllocationRequestId());
   }
-  
+
   private ContainerAllocation handleNewContainerAllocation(
       ContainerAllocation allocationResult, FiCaSchedulerNode node,
-      SchedulerRequestKey schedulerKey, RMContainer reservedContainer,
-      Container container) {
-    // Handling container allocation
-    // Did we previously reserve containers at this 'priority'?
-    if (reservedContainer != null) {
-      application.unreserve(schedulerKey, node, reservedContainer);
-    }
-    
+      SchedulerRequestKey schedulerKey, Container container) {
     // Inform the application
-    RMContainer allocatedContainer =
-        application.allocate(allocationResult.containerNodeType, node,
-            schedulerKey, lastResourceRequest, container);
+    RMContainer allocatedContainer = application.allocate(node, schedulerKey,
+        lastResourceRequest, container);
+
+    allocationResult.updatedContainer = allocatedContainer;
 
     // Does the application need this resource?
     if (allocatedContainer == null) {
@@ -710,13 +714,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
       return ret;
     }
-
-    // Inform the node
-    node.allocateContainer(allocatedContainer);
-    
-    // update locality statistics
-    application.incNumAllocatedContainers(allocationResult.containerNodeType,
-        allocationResult.requestNodeType);
     
     return allocationResult;    
   }
@@ -743,14 +740,18 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
       // When allocating container
-      allocationResult =
-          handleNewContainerAllocation(allocationResult, node, schedulerKey,
-              reservedContainer, container);
+      allocationResult = handleNewContainerAllocation(allocationResult, node,
+          schedulerKey, container);
     } else {
       // When reserving container
-      application.reserve(schedulerKey, node, reservedContainer, container);
+      RMContainer updatedContainer = reservedContainer;
+      if (updatedContainer == null) {
+        updatedContainer = new RMContainerImpl(container,
+            application.getApplicationAttemptId(), node.getNodeID(),
+            application.getAppSchedulingInfo().getUser(), rmContext);
+      }
+      allocationResult.updatedContainer = updatedContainer;
     }
-    allocationResult.updatedContainer = container;
 
     // Only reset opportunities when we FIRST allocate the container. (IAW, When
     // reservedContainer != null, it's not the first time)
@@ -788,16 +789,46 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation allocate(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
       RMContainer reservedContainer) {
-    ContainerAllocation result =
-        preAllocation(clusterResource, node, schedulingMode, resourceLimits,
-            schedulerKey, reservedContainer);
+    // Do checks before determining which node to allocate
+    // Directly return if this check fails.
+    ContainerAllocation result;
+    if (reservedContainer == null) {
+      result = preCheckForPlacementSet(clusterResource, ps, schedulingMode,
+          resourceLimits, schedulerKey);
+      if (null != result) {
+        return result;
+      }
+    } else {
+      // pre-check when allocating reserved container
+      if (application.getTotalRequiredResources(schedulerKey) == 0) {
+        // Release
+        return new ContainerAllocation(reservedContainer, null,
+            AllocationState.QUEUE_SKIPPED);
+      }
+    }
+
+    SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
+        application.getAppSchedulingInfo().getSchedulingPlacementSet(
+            schedulerKey);
+
+    result = ContainerAllocation.PRIORITY_SKIPPED;
 
-    if (AllocationState.ALLOCATED == result.state
-        || AllocationState.RESERVED == result.state) {
-      result = doAllocation(result, node, schedulerKey, reservedContainer);
+    Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
+        ps);
+    while (iter.hasNext()) {
+      FiCaSchedulerNode node = iter.next();
+
+      result = tryAllocateOnNode(clusterResource, node, schedulingMode,
+          resourceLimits, schedulerKey, reservedContainer);
+
+      if (AllocationState.ALLOCATED == result.state
+          || AllocationState.RESERVED == result.state) {
+        result = doAllocation(result, node, schedulerKey, reservedContainer);
+        break;
+      }
     }
 
     return result;
@@ -805,17 +836,19 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits,
       RMContainer reservedContainer) {
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+
     if (reservedContainer == null) {
       // Check if application needs more resource, skip if it doesn't need more.
       if (!application.hasPendingResourceRequest(rc,
-          node.getPartition(), clusterResource, schedulingMode)) {
+          ps.getPartition(), clusterResource, schedulingMode)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
               + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-label=" + node.getPartition());
+              + schedulingMode.name() + " node-label=" + ps.getPartition());
         }
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
             activitiesManager, node, application, application.getPriority(),
@@ -826,7 +859,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       // Schedule in priority order
       for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
         ContainerAllocation result =
-            allocate(clusterResource, node, schedulingMode, resourceLimits,
+            allocate(clusterResource, ps, schedulingMode, resourceLimits,
                 schedulerKey, null);
 
         AllocationState allocationState = result.getAllocationState();
@@ -845,7 +878,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       return CSAssignment.SKIP_ASSIGNMENT;
     } else {
       ContainerAllocation result =
-          allocate(clusterResource, node, schedulingMode, resourceLimits,
+          allocate(clusterResource, ps, schedulingMode, resourceLimits,
               reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
           reservedContainer, node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index aad3bc7..63d8a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -38,11 +38,13 @@ public class AssignmentInformation {
   }
 
   public static class AssignmentDetails {
+    public RMContainer rmContainer;
     public ContainerId containerId;
     public String queue;
 
-    public AssignmentDetails(ContainerId containerId, String queue) {
-      this.containerId = containerId;
+    public AssignmentDetails(RMContainer rmContainer, String queue) {
+      this.containerId = rmContainer.getContainerId();
+      this.rmContainer = rmContainer;
       this.queue = queue;
     }
   }
@@ -58,7 +60,7 @@ public class AssignmentInformation {
     for (Operation op : Operation.values()) {
       operationCounts.put(op, 0);
       operationResources.put(op, Resource.newInstance(0, 0));
-      operationDetails.put(op, new ArrayList<AssignmentDetails>());
+      operationDetails.put(op, new ArrayList<>());
     }
   }
 
@@ -98,17 +100,17 @@ public class AssignmentInformation {
     return operationResources.get(Operation.RESERVATION);
   }
 
-  private void addAssignmentDetails(Operation op, ContainerId containerId,
+  private void addAssignmentDetails(Operation op, RMContainer rmContainer,
       String queue) {
-    operationDetails.get(op).add(new AssignmentDetails(containerId, queue));
+    operationDetails.get(op).add(new AssignmentDetails(rmContainer, queue));
   }
 
-  public void addAllocationDetails(ContainerId containerId, String queue) {
-    addAssignmentDetails(Operation.ALLOCATION, containerId, queue);
+  public void addAllocationDetails(RMContainer rmContainer, String queue) {
+    addAssignmentDetails(Operation.ALLOCATION, rmContainer, queue);
   }
 
-  public void addReservationDetails(ContainerId containerId, String queue) {
-    addAssignmentDetails(Operation.RESERVATION, containerId, queue);
+  public void addReservationDetails(RMContainer rmContainer, String queue) {
+    addAssignmentDetails(Operation.RESERVATION, rmContainer, queue);
   }
 
   public List<AssignmentDetails> getAllocationDetails() {
@@ -119,23 +121,31 @@ public class AssignmentInformation {
     return operationDetails.get(Operation.RESERVATION);
   }
 
-  private ContainerId getFirstContainerIdFromOperation(Operation op) {
+  private RMContainer getFirstRMContainerFromOperation(Operation op) {
     if (null != operationDetails.get(op)) {
       List<AssignmentDetails> assignDetails =
           operationDetails.get(op);
       if (!assignDetails.isEmpty()) {
-        return assignDetails.get(0).containerId;
+        return assignDetails.get(0).rmContainer;
       }
     }
     return null;
   }
 
+  public RMContainer getFirstAllocatedOrReservedRMContainer() {
+    RMContainer rmContainer;
+    rmContainer = getFirstRMContainerFromOperation(Operation.ALLOCATION);
+    if (null != rmContainer) {
+      return rmContainer;
+    }
+    return getFirstRMContainerFromOperation(Operation.RESERVATION);
+  }
+
   public ContainerId getFirstAllocatedOrReservedContainerId() {
-    ContainerId containerId;
-    containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
-    if (null != containerId) {
-      return containerId;
+    RMContainer rmContainer = getFirstAllocatedOrReservedRMContainer();
+    if (null != rmContainer) {
+      return rmContainer.getContainerId();
     }
-    return getFirstContainerIdFromOperation(Operation.RESERVATION);
+    return null;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
new file mode 100644
index 0000000..ac83d6f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Proposal to allocate/reserve a new container
+ */
+public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
+    N extends SchedulerNode> {
+  // Container we allocated or reserved
+  private SchedulerContainer<A, N> allocatedOrReservedContainer;
+
+  // Containers we need to release before allocating or reserving the
+  // new container
+  private List<SchedulerContainer<A, N>> toRelease = Collections.emptyList();
+
+  // When trying to allocate from a reserved container, set this, and this will
+  // not be included by toRelease list
+  private SchedulerContainer<A, N> allocateFromReservedContainer;
+
+  private boolean isIncreasedAllocation;
+
+  private NodeType allocationLocalityType;
+
+  private NodeType requestLocalityType;
+
+  private SchedulingMode schedulingMode;
+
+  private Resource allocatedResource; // newly allocated resource
+
+  public ContainerAllocationProposal(
+      SchedulerContainer<A, N> allocatedOrReservedContainer,
+      List<SchedulerContainer<A, N>> toRelease,
+      SchedulerContainer<A, N> allocateFromReservedContainer,
+      boolean isIncreasedAllocation, NodeType allocationLocalityType,
+      NodeType requestLocalityType, SchedulingMode schedulingMode,
+      Resource allocatedResource) {
+    this.allocatedOrReservedContainer = allocatedOrReservedContainer;
+    if (null != toRelease) {
+      this.toRelease = toRelease;
+    }
+    this.allocateFromReservedContainer = allocateFromReservedContainer;
+    this.isIncreasedAllocation = isIncreasedAllocation;
+    this.allocationLocalityType = allocationLocalityType;
+    this.requestLocalityType = requestLocalityType;
+    this.schedulingMode = schedulingMode;
+    this.allocatedResource = allocatedResource;
+  }
+
+  public SchedulingMode getSchedulingMode() {
+    return schedulingMode;
+  }
+
+  public Resource getAllocatedOrReservedResource() {
+    return allocatedResource;
+  }
+
+  public NodeType getAllocationLocalityType() {
+    return allocationLocalityType;
+  }
+
+  public boolean isIncreasedAllocation() {
+    return isIncreasedAllocation;
+  }
+
+  public SchedulerContainer<A, N> getAllocateFromReservedContainer() {
+    return allocateFromReservedContainer;
+  }
+
+  public SchedulerContainer<A, N> getAllocatedOrReservedContainer() {
+    return allocatedOrReservedContainer;
+  }
+
+  public List<SchedulerContainer<A, N>> getToRelease() {
+    return toRelease;
+  }
+
+  @Override
+  public String toString() {
+    return allocatedOrReservedContainer.toString();
+  }
+
+  public NodeType getRequestLocalityType() {
+    return requestLocalityType;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java
new file mode 100644
index 0000000..bdea97d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Scheduler should implement this interface if it wants to have multi-threading
+ * plus global scheduling functionality
+ */
+public interface ResourceAllocationCommitter {
+  void tryCommit(Resource cluster, ResourceCommitRequest proposal);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java
new file mode 100644
index 0000000..5aca202
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ResourceCommitRequest<A extends SchedulerApplicationAttempt,
+    N extends SchedulerNode> {
+  // New containers to be allocated
+  private List<ContainerAllocationProposal<A, N>> containersToAllocate =
+      Collections.emptyList();
+
+  // New containers to be released
+  private List<ContainerAllocationProposal<A, N>> containersToReserve =
+      Collections.emptyList();
+
+  // We don't need these containers anymore
+  private List<SchedulerContainer<A, N>> toReleaseContainers =
+      Collections.emptyList();
+
+  private Resource totalAllocatedResource;
+  private Resource totalReservedResource;
+  private Resource totalReleasedResource;
+
+  public ResourceCommitRequest(
+      List<ContainerAllocationProposal<A, N>> containersToAllocate,
+      List<ContainerAllocationProposal<A, N>> containersToReserve,
+      List<SchedulerContainer<A, N>> toReleaseContainers) {
+    if (null != containersToAllocate) {
+      this.containersToAllocate = containersToAllocate;
+    }
+    if (null != containersToReserve) {
+      this.containersToReserve = containersToReserve;
+    }
+    if (null != toReleaseContainers) {
+      this.toReleaseContainers = toReleaseContainers;
+    }
+
+    totalAllocatedResource = Resources.createResource(0);
+    totalReservedResource = Resources.createResource(0);
+
+    /*
+     * For total-release resource, it has two parts:
+     * 1) Unconditional release: for example, an app reserved a container,
+     *    but the app doesn't has any pending resource.
+     * 2) Conditional release: for example, reservation continuous looking, or
+     *    Lazy preemption -- which we need to kill some resource to allocate
+     *    or reserve the new container.
+     *
+     * For the 2nd part, it is inside:
+     * ContainerAllocationProposal#toRelease, which means we will kill/release
+     * these containers to allocate/reserve the given container.
+     *
+     * So we need to account both of conditional/unconditional to-release
+     * containers to the total release-able resource.
+     */
+    totalReleasedResource = Resources.createResource(0);
+
+    for (ContainerAllocationProposal<A,N> c : this.containersToAllocate) {
+      Resources.addTo(totalAllocatedResource,
+          c.getAllocatedOrReservedResource());
+      for (SchedulerContainer<A,N> r : c.getToRelease()) {
+        Resources.addTo(totalReleasedResource,
+            r.getRmContainer().getAllocatedOrReservedResource());
+      }
+    }
+
+    for (ContainerAllocationProposal<A,N> c : this.containersToReserve) {
+      Resources.addTo(totalReservedResource,
+          c.getAllocatedOrReservedResource());
+      for (SchedulerContainer<A,N> r : c.getToRelease()) {
+        Resources.addTo(totalReleasedResource,
+            r.getRmContainer().getAllocatedOrReservedResource());
+      }
+    }
+
+    for (SchedulerContainer<A,N> r : this.toReleaseContainers) {
+      Resources.addTo(totalReleasedResource,
+          r.getRmContainer().getAllocatedOrReservedResource());
+    }
+  }
+
+  public List<ContainerAllocationProposal<A, N>> getContainersToAllocate() {
+    return containersToAllocate;
+  }
+
+  public List<ContainerAllocationProposal<A, N>> getContainersToReserve() {
+    return containersToReserve;
+  }
+
+  public List<SchedulerContainer<A, N>> getContainersToRelease() {
+    return toReleaseContainers;
+  }
+
+  public Resource getTotalAllocatedResource() {
+    return totalAllocatedResource;
+  }
+
+  public Resource getTotalReservedResource() {
+    return totalReservedResource;
+  }
+
+  public Resource getTotalReleasedResource() {
+    return totalReleasedResource;
+  }
+
+  /*
+   * Util functions to make your life easier
+   */
+  public boolean anythingAllocatedOrReserved() {
+    return (!containersToAllocate.isEmpty()) || (!containersToReserve
+        .isEmpty());
+  }
+
+  public ContainerAllocationProposal<A, N> getFirstAllocatedOrReservedContainer() {
+    ContainerAllocationProposal<A, N> c = null;
+    if (!containersToAllocate.isEmpty()) {
+      c = containersToAllocate.get(0);
+    }
+    if (c == null && !containersToReserve.isEmpty()) {
+      c = containersToReserve.get(0);
+    }
+
+    return c;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("New " + getClass().getName() + ":" + "\n");
+    if (null != containersToAllocate && !containersToAllocate.isEmpty()) {
+      sb.append("\t ALLOCATED=" + containersToAllocate.toString());
+    }
+    if (null != containersToReserve && !containersToReserve.isEmpty()) {
+      sb.append("\t RESERVED=" + containersToReserve.toString());
+    }
+    if (null != toReleaseContainers && !toReleaseContainers.isEmpty()) {
+      sb.append("\t RELEASED=" + toReleaseContainers.toString());
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message