hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [11/13] hadoop git commit: YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan
Date Wed, 05 Aug 2015 22:40:49 GMT
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba2313d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba2313d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba2313d6

Branch: refs/heads/HADOOP-12111
Commit: ba2313d6145a1234777938a747187373f4cd58d9
Parents: f271d37
Author: Jian He <jianhe@apache.org>
Authored: Wed Aug 5 13:45:17 2015 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Wed Aug 5 13:47:40 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     |   8 +-
 .../scheduler/capacity/CSAssignment.java        |  11 +-
 .../scheduler/capacity/LeafQueue.java           |  10 +-
 .../scheduler/capacity/ParentQueue.java         |   4 +-
 .../capacity/allocator/AllocationState.java     |  28 +
 .../capacity/allocator/ContainerAllocation.java |  76 +++
 .../capacity/allocator/ContainerAllocator.java  | 115 ++++
 .../allocator/RegularContainerAllocator.java    | 629 ++++++++++++++++++
 .../scheduler/common/fica/FiCaSchedulerApp.java | 658 ++-----------------
 .../scheduler/capacity/TestLeafQueue.java       | 161 ++---
 11 files changed, 1011 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ba023a5..cff7f6b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -391,6 +391,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand.
     (Hong Zhiguo via kasha)
 
+    YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
+    container allocation logic. (Wangda Tan via jianhe)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index dcc4205..134b941 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -53,13 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
-  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
-  
-  static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-  
-  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-  
+  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);  
   CSQueue parent;
   final String queueName;
   volatile int numContainers;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index ceb6f7e..928437f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -24,12 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
 @Unstable
 public class CSAssignment {
+  public static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
 
-  final private Resource resource;
+  public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+
+  private Resource resource;
   private NodeType type;
   private RMContainer excessReservation;
   private FiCaSchedulerApp application;
@@ -67,6 +72,10 @@ public class CSAssignment {
   public Resource getResource() {
     return resource;
   }
+  
+  public void setResource(Resource resource) {
+    this.resource = resource;
+  }
 
   public NodeType getType() {
     return type;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index acfbad0..a71cc68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -777,7 +777,7 @@ public class LeafQueue extends AbstractCSQueue {
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
 
     // Check if this queue need more resource, simply skip allocation if this
@@ -789,7 +789,7 @@ public class LeafQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
 
     for (Iterator<FiCaSchedulerApp> assignmentIterator =
@@ -800,7 +800,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
           currentResourceLimits, application.getCurrentReservation(),
           schedulingMode)) {
-        return NULL_ASSIGNMENT;
+        return CSAssignment.NULL_ASSIGNMENT;
       }
       
       Resource userLimit =
@@ -846,11 +846,11 @@ public class LeafQueue extends AbstractCSQueue {
       } else if (!assignment.getSkipped()) {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications
-        return NULL_ASSIGNMENT;
+        return CSAssignment.NULL_ASSIGNMENT;
       }
     }
 
-    return NULL_ASSIGNMENT;
+    return CSAssignment.NULL_ASSIGNMENT;
   }
 
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index e54b9e2..725aea1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -384,7 +384,7 @@ public class ParentQueue extends AbstractCSQueue {
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
     
     // Check if this queue need more resource, simply skip allocation if this
@@ -396,7 +396,7 @@ public class ParentQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
     
     CSAssignment assignment = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
new file mode 100644
index 0000000..d1580bd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+public enum AllocationState {
+  APP_SKIPPED,
+  PRIORITY_SKIPPED,
+  LOCALITY_SKIPPED,
+  QUEUE_SKIPPED,
+  ALLOCATED,
+  RESERVED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
new file mode 100644
index 0000000..00c1bb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ContainerAllocation {
+  public static final ContainerAllocation PRIORITY_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
+  
+  public static final ContainerAllocation APP_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
+
+  public static final ContainerAllocation QUEUE_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
+  
+  public static final ContainerAllocation LOCALITY_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
+  
+  RMContainer containerToBeUnreserved;
+  private Resource resourceToBeAllocated = Resources.none();
+  AllocationState state;
+  NodeType containerNodeType = NodeType.NODE_LOCAL;
+  NodeType requestNodeType = NodeType.NODE_LOCAL;
+  Container updatedContainer;
+
+  public ContainerAllocation(RMContainer containerToBeUnreserved,
+      Resource resourceToBeAllocated, AllocationState state) {
+    this.containerToBeUnreserved = containerToBeUnreserved;
+    this.resourceToBeAllocated = resourceToBeAllocated;
+    this.state = state;
+  }
+  
+  public RMContainer getContainerToBeUnreserved() {
+    return containerToBeUnreserved;
+  }
+  
+  public Resource getResourceToBeAllocated() {
+    if (resourceToBeAllocated == null) {
+      return Resources.none();
+    }
+    return resourceToBeAllocated;
+  }
+  
+  public AllocationState getAllocationState() {
+    return state;
+  }
+  
+  public NodeType getContainerNodeType() {
+    return containerNodeType;
+  }
+  
+  public Container getUpdatedContainer() {
+    return updatedContainer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
new file mode 100644
index 0000000..b4168dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * For an application, resource limits and resource requests, decide how to
+ * allocate container. This is to make application resource allocation logic
+ * extensible.
+ */
+public abstract class ContainerAllocator {
+  FiCaSchedulerApp application;
+  final ResourceCalculator rc;
+  final RMContext rmContext;
+  
+  public ContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext) {
+    this.application = application;
+    this.rc = rc;
+    this.rmContext = rmContext;
+  }
+  
+  /**
+   * preAllocation is to perform checks, etc. to see if we can/cannot allocate
+   * container. It will put necessary information to returned
+   * {@link ContainerAllocation}. 
+   */
+  abstract ContainerAllocation preAllocation(
+      Resource clusterResource, FiCaSchedulerNode node,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      Priority priority, RMContainer reservedContainer);
+  
+  /**
+   * doAllocation is to update application metrics, create containers, etc.
+   * According to allocating conclusion decided by preAllocation.
+   */
+  abstract ContainerAllocation doAllocation(
+      ContainerAllocation allocationResult, Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
+      RMContainer reservedContainer);
+  
+  boolean checkHeadroom(Resource clusterResource,
+      ResourceLimits currentResourceLimits, Resource required,
+      FiCaSchedulerNode node) {
+    // If headroom + currentReservation < required, we cannot allocate this
+    // require
+    Resource resourceCouldBeUnReserved = application.getCurrentReservation();
+    if (!application.getCSLeafQueue().getReservationContinueLooking()
+        || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+      // If we don't allow reservation continuous looking, OR we're looking at
+      // non-default node partition, we won't allow to unreserve before
+      // allocation.
+      resourceCouldBeUnReserved = Resources.none();
+    }
+    return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
+        currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
+        required);
+  }
+  
+  /**
+   * allocate needs to handle following stuffs:
+   * 
+   * <ul>
+   * <li>Select request: Select a request to allocate. E.g. select a resource
+   * request based on requirement/priority/locality.</li>
+   * <li>Check if a given resource can be allocated based on resource
+   * availability</li>
+   * <li>Do allocation: this will decide/create allocated/reserved
+   * container, this will also update metrics</li>
+   * </ul>
+   */
+  public ContainerAllocation allocate(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority,
+      RMContainer reservedContainer) {
+    ContainerAllocation result =
+        preAllocation(clusterResource, node, schedulingMode,
+            resourceLimits, priority, reservedContainer);
+    
+    if (AllocationState.ALLOCATED == result.state
+        || AllocationState.RESERVED == result.state) {
+      result = doAllocation(result, clusterResource, node,
+          schedulingMode, priority, reservedContainer);
+    }
+    
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
new file mode 100644
index 0000000..6effcd3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Allocate normal (new) containers, considers locality/label, etc. Using
+ * delayed scheduling mechanism to get better locality allocation.
+ */
+public class RegularContainerAllocator extends ContainerAllocator {
+  private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
+  
+  private ResourceRequest lastResourceRequest = null;
+  
+  public RegularContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext) {
+    super(application, rc, rmContext);
+  }
+  
+  private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority) {
+    if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
+      return ContainerAllocation.APP_SKIPPED;
+    }
+
+    ResourceRequest anyRequest =
+        application.getResourceRequest(priority, ResourceRequest.ANY);
+    if (null == anyRequest) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    // Required resource
+    Resource required = anyRequest.getCapability();
+
+    // Do we need containers at this 'priority'?
+    if (application.getTotalRequiredResources(priority) <= 0) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    // AM container allocation doesn't support non-exclusive allocation to
+    // avoid painful of preempt an AM container
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      RMAppAttempt rmAppAttempt =
+          rmContext.getRMApps().get(application.getApplicationId())
+              .getCurrentAppAttempt();
+      if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
+          && null == rmAppAttempt.getMasterContainer()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip allocating AM container to app_attempt="
+              + application.getApplicationAttemptId()
+              + ", don't allow to allocate AM container in non-exclusive mode");
+        }
+        return ContainerAllocation.APP_SKIPPED;
+      }
+    }
+
+    // Is the node-label-expression of this offswitch resource request
+    // matches the node's label?
+    // If not match, jump to next priority.
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest,
+        node.getPartition(), schedulingMode)) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    if (!application.getCSLeafQueue().getReservationContinueLooking()) {
+      if (!shouldAllocOrReserveNewContainer(priority, required)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("doesn't need containers based on reservation algo!");
+        }
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+    }
+
+    if (!checkHeadroom(clusterResource, resourceLimits, required, node)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("cannot allocate required resource=" + required
+            + " because of headroom");
+      }
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    // Inform the application it is about to get a scheduling opportunity
+    application.addSchedulingOpportunity(priority);
+
+    // Increase missed-non-partitioned-resource-request-opportunity.
+    // This is to make sure non-partitioned-resource-request will prefer
+    // to be allocated to non-partitioned nodes
+    int missedNonPartitionedRequestSchedulingOpportunity = 0;
+    if (anyRequest.getNodeLabelExpression()
+        .equals(RMNodeLabelsManager.NO_LABEL)) {
+      missedNonPartitionedRequestSchedulingOpportunity =
+          application
+              .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+    }
+
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      // Before doing allocation, we need to check scheduling opportunity to
+      // make sure : non-partitioned resource request should be scheduled to
+      // non-partitioned partition first.
+      if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
+          .getScheduler().getNumClusterNodes()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+              + " priority=" + priority
+              + " because missed-non-partitioned-resource-request"
+              + " opportunity under requred:" + " Now="
+              + missedNonPartitionedRequestSchedulingOpportunity + " required="
+              + rmContext.getScheduler().getNumClusterNodes());
+        }
+
+        return ContainerAllocation.APP_SKIPPED;
+      }
+    }
+    
+    return null;
+  }
+
+  @Override
+  ContainerAllocation preAllocation(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority,
+      RMContainer reservedContainer) {
+    ContainerAllocation result;
+    if (null == reservedContainer) {
+      // pre-check when allocating new container
+      result =
+          preCheckForNewContainer(clusterResource, node, schedulingMode,
+              resourceLimits, priority);
+      if (null != result) {
+        return result;
+      }
+    } else {
+      // pre-check when allocating reserved container
+      if (application.getTotalRequiredResources(priority) == 0) {
+        // Release
+        return new ContainerAllocation(reservedContainer, null,
+            AllocationState.QUEUE_SKIPPED);
+      }
+    }
+
+    // Try to allocate containers on node
+    result =
+        assignContainersOnNode(clusterResource, node, priority,
+            reservedContainer, schedulingMode, resourceLimits);
+    
+    if (null == reservedContainer) {
+      if (result.state == AllocationState.PRIORITY_SKIPPED) {
+        // Don't count 'skipped nodes' as a scheduling opportunity!
+        application.subtractSchedulingOpportunity(priority);
+      }
+    }
+    
+    return result;
+  }
+  
+  public synchronized float getLocalityWaitFactor(
+      Priority priority, int clusterNodes) {
+    // Estimate: Required unique resources (i.e. hosts + racks)
+    int requiredResources = 
+        Math.max(application.getResourceRequests(priority).size() - 1, 0);
+    
+    // waitFactor can't be more than '1' 
+    // i.e. no point skipping more than clustersize opportunities
+    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
+  }
+  
+  private int getActualNodeLocalityDelay() {
+    return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
+        .getCSLeafQueue().getNodeLocalityDelay());
+  }
+
+  private boolean canAssign(Priority priority, FiCaSchedulerNode node,
+      NodeType type, RMContainer reservedContainer) {
+
+    // Clearly we need containers for this application...
+    if (type == NodeType.OFF_SWITCH) {
+      if (reservedContainer != null) {
+        return true;
+      }
+
+      // 'Delay' off-switch
+      ResourceRequest offSwitchRequest =
+          application.getResourceRequest(priority, ResourceRequest.ANY);
+      long missedOpportunities = application.getSchedulingOpportunities(priority);
+      long requiredContainers = offSwitchRequest.getNumContainers();
+
+      float localityWaitFactor =
+          getLocalityWaitFactor(priority, rmContext.getScheduler()
+              .getNumClusterNodes());
+
+      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
+    }
+
+    // Check if we need containers on this rack
+    ResourceRequest rackLocalRequest =
+        application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+      return false;
+    }
+
+    // If we are here, we do need containers on this rack for RACK_LOCAL req
+    if (type == NodeType.RACK_LOCAL) {
+      // 'Delay' rack-local just a little bit...
+      long missedOpportunities = application.getSchedulingOpportunities(priority);
+      return getActualNodeLocalityDelay() < missedOpportunities;
+    }
+
+    // Check if we need containers on this host
+    if (type == NodeType.NODE_LOCAL) {
+      // Now check if we need containers on this host...
+      ResourceRequest nodeLocalRequest =
+          application.getResourceRequest(priority, node.getNodeName());
+      if (nodeLocalRequest != null) {
+        return nodeLocalRequest.getNumContainers() > 0;
+      }
+    }
+
+    return false;
+  }
+
+  private ContainerAllocation assignNodeLocalContainers(
+      Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    // Skip node-local request, go to rack-local request
+    return ContainerAllocation.LOCALITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignRackLocalContainers(
+      Resource clusterResource, ResourceRequest rackLocalResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    // Skip rack-local request, go to off-switch request
+    return ContainerAllocation.LOCALITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignOffSwitchContainers(
+      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    return ContainerAllocation.QUEUE_SKIPPED;
+  }
+
+  private ContainerAllocation assignContainersOnNode(Resource clusterResource,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+
+    ContainerAllocation assigned;
+
+    NodeType requestType = null;
+    // Data-local
+    ResourceRequest nodeLocalResourceRequest =
+        application.getResourceRequest(priority, node.getNodeName());
+    if (nodeLocalResourceRequest != null) {
+      requestType = NodeType.NODE_LOCAL;
+      assigned =
+          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      if (Resources.greaterThan(rc, clusterResource,
+          assigned.getResourceToBeAllocated(), Resources.none())) {
+        assigned.requestNodeType = requestType;
+        return assigned;
+      }
+    }
+
+    // Rack-local
+    ResourceRequest rackLocalResourceRequest =
+        application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalResourceRequest != null) {
+      if (!rackLocalResourceRequest.getRelaxLocality()) {
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+
+      if (requestType != NodeType.NODE_LOCAL) {
+        requestType = NodeType.RACK_LOCAL;
+      }
+
+      assigned =
+          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      if (Resources.greaterThan(rc, clusterResource,
+          assigned.getResourceToBeAllocated(), Resources.none())) {
+        assigned.requestNodeType = requestType;
+        return assigned;
+      }
+    }
+
+    // Off-switch
+    ResourceRequest offSwitchResourceRequest =
+        application.getResourceRequest(priority, ResourceRequest.ANY);
+    if (offSwitchResourceRequest != null) {
+      if (!offSwitchResourceRequest.getRelaxLocality()) {
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+      if (requestType != NodeType.NODE_LOCAL
+          && requestType != NodeType.RACK_LOCAL) {
+        requestType = NodeType.OFF_SWITCH;
+      }
+
+      assigned =
+          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      assigned.requestNodeType = requestType;
+
+      return assigned;
+    }
+
+    return ContainerAllocation.PRIORITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignContainer(Resource clusterResource,
+      FiCaSchedulerNode node, Priority priority, ResourceRequest request,
+      NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
+    lastResourceRequest = request;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("assignContainers: node=" + node.getNodeName()
+        + " application=" + application.getApplicationId()
+        + " priority=" + priority.getPriority()
+        + " request=" + request + " type=" + type);
+    }
+
+    // check if the resource request can access the label
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+        node.getPartition(), schedulingMode)) {
+      // this is a reserved container, but we cannot allocate it now according
+      // to label not match. This can be caused by node label changed
+      // We should un-reserve this container.
+      return new ContainerAllocation(rmContainer, null,
+          AllocationState.QUEUE_SKIPPED);
+    }
+
+    Resource capability = request.getCapability();
+    Resource available = node.getAvailableResource();
+    Resource totalResource = node.getTotalResource();
+
+    if (!Resources.lessThanOrEqual(rc, clusterResource,
+        capability, totalResource)) {
+      LOG.warn("Node : " + node.getNodeID()
+          + " does not have sufficient resource for request : " + request
+          + " node total capability : " + node.getTotalResource());
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    assert Resources.greaterThan(
+        rc, clusterResource, available, Resources.none());
+
+    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+        priority, capability);
+
+    // Can we allocate a container on this node?
+    int availableContainers =
+        rc.computeAvailableContainers(available, capability);
+
+    // How much need to unreserve equals to:
+    // max(required - headroom, amountNeedUnreserve)
+    Resource resourceNeedToUnReserve =
+        Resources.max(rc, clusterResource,
+            Resources.subtract(capability, currentResoureLimits.getHeadroom()),
+            currentResoureLimits.getAmountNeededUnreserve());
+
+    boolean needToUnreserve =
+        Resources.greaterThan(rc, clusterResource,
+            resourceNeedToUnReserve, Resources.none());
+
+    RMContainer unreservedContainer = null;
+    boolean reservationsContinueLooking =
+        application.getCSLeafQueue().getReservationContinueLooking();
+
+    if (availableContainers > 0) {
+      // Allocate...
+      // We will only do continuous reservation when this is not allocated from
+      // reserved container
+      if (rmContainer == null && reservationsContinueLooking
+          && node.getLabels().isEmpty()) {
+        // when reservationsContinueLooking is set, we may need to unreserve
+        // some containers to meet this queue, its parents', or the users'
+        // resource limits.
+        // TODO, need change here when we want to support continuous reservation
+        // looking for labeled partitions.
+        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+          if (!needToUnreserve) {
+            // If we shouldn't allocate/reserve new container then we should
+            // unreserve one the same size we are asking for since the
+            // currentResoureLimits.getAmountNeededUnreserve could be zero. If
+            // the limit was hit then use the amount we need to unreserve to be
+            // under the limit.
+            resourceNeedToUnReserve = capability;
+          }
+          unreservedContainer =
+              application.findNodeToUnreserve(clusterResource, node, priority,
+                  resourceNeedToUnReserve);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate
+          // new/reserved
+          // container (That means we *have to* unreserve some resource to
+          // continue)). If we failed to unreserve some resource, we can't
+          // continue.
+          if (null == unreservedContainer) {
+            return ContainerAllocation.QUEUE_SKIPPED;
+          }
+        }
+      }
+
+      ContainerAllocation result =
+          new ContainerAllocation(unreservedContainer, request.getCapability(),
+              AllocationState.ALLOCATED);
+      result.containerNodeType = type;
+      return result;
+    } else {
+      // if we are allowed to allocate but this node doesn't have space, reserve it or
+      // if this was an already a reserved container, reserve it again
+      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+        if (reservationsContinueLooking && rmContainer == null) {
+          // we could possibly ignoring queue capacity or user limits when
+          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+          // one.
+          if (needToUnreserve) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("we needed to unreserve to be able to allocate");
+            }
+            return ContainerAllocation.QUEUE_SKIPPED;
+          }
+        }
+
+        ContainerAllocation result =
+            new ContainerAllocation(null, request.getCapability(),
+                AllocationState.RESERVED);
+        result.containerNodeType = type;
+        return result;
+      }
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+  }
+
+  boolean
+      shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
+    int requiredContainers = application.getTotalRequiredResources(priority);
+    int reservedContainers = application.getNumReservedContainers(priority);
+    int starvation = 0;
+    if (reservedContainers > 0) {
+      float nodeFactor =
+          Resources
+              .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
+
+      // Use percentage of node required to bias against large containers...
+      // Protect against corner case where you need the whole node with
+      // Math.min(nodeFactor, minimumAllocationFactor)
+      starvation =
+          (int) ((application.getReReservations(priority) / 
+              (float) reservedContainers) * (1.0f - (Math.min(
+                  nodeFactor, application.getCSLeafQueue()
+                  .getMinimumAllocationFactor()))));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("needsContainers:" + " app.#re-reserve="
+            + application.getReReservations(priority) + " reserved="
+            + reservedContainers + " nodeFactor=" + nodeFactor
+            + " minAllocFactor="
+            + application.getCSLeafQueue().getMinimumAllocationFactor()
+            + " starvation=" + starvation);
+      }
+    }
+    return (((starvation + requiredContainers) - reservedContainers) > 0);
+  }
+  
+  private Container getContainer(RMContainer rmContainer,
+      FiCaSchedulerNode node, Resource capability, Priority priority) {
+    return (rmContainer != null) ? rmContainer.getContainer()
+        : createContainer(node, capability, priority);
+  }
+
+  private Container createContainer(FiCaSchedulerNode node, Resource capability,
+      Priority priority) {
+
+    NodeId nodeId = node.getRMNode().getNodeID();
+    ContainerId containerId =
+        BuilderUtils.newContainerId(application.getApplicationAttemptId(),
+            application.getNewContainerId());
+
+    // Create the container
+    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
+        .getHttpAddress(), capability, priority, null);
+  }
+  
+  private ContainerAllocation handleNewContainerAllocation(
+      ContainerAllocation allocationResult, FiCaSchedulerNode node,
+      Priority priority, RMContainer reservedContainer, Container container) {
+    // Handling container allocation
+    // Did we previously reserve containers at this 'priority'?
+    if (reservedContainer != null) {
+      application.unreserve(priority, node, reservedContainer);
+    }
+    
+    // Inform the application
+    RMContainer allocatedContainer =
+        application.allocate(allocationResult.containerNodeType, node,
+            priority, lastResourceRequest, container);
+
+    // Does the application need this resource?
+    if (allocatedContainer == null) {
+      // Skip this app if we failed to allocate.
+      ContainerAllocation ret =
+          new ContainerAllocation(allocationResult.containerToBeUnreserved,
+              null, AllocationState.QUEUE_SKIPPED);
+      ret.state = AllocationState.APP_SKIPPED;
+      return ret;
+    }
+
+    // Inform the node
+    node.allocateContainer(allocatedContainer);
+    
+    // update locality statistics
+    application.incNumAllocatedContainers(allocationResult.containerNodeType,
+        allocationResult.requestNodeType);
+    
+    return allocationResult;    
+  }
+
+  @Override
+  ContainerAllocation doAllocation(ContainerAllocation allocationResult,
+      Resource clusterResource, FiCaSchedulerNode node,
+      SchedulingMode schedulingMode, Priority priority,
+      RMContainer reservedContainer) {
+    // Create the container if necessary
+    Container container =
+        getContainer(reservedContainer, node,
+            allocationResult.getResourceToBeAllocated(), priority);
+
+    // something went wrong getting/creating the container
+    if (container == null) {
+      LOG.warn("Couldn't get container for allocation!");
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
+      // When allocating container
+      allocationResult =
+          handleNewContainerAllocation(allocationResult, node, priority,
+              reservedContainer, container);
+    } else {
+      // When reserving container
+      application.reserve(priority, node, reservedContainer, container);
+    }
+    allocationResult.updatedContainer = container;
+
+    // Only reset opportunities when we FIRST allocate the container. (IAW, When
+    // reservedContainer != null, it's not the first time)
+    if (reservedContainer == null) {
+      // Don't reset scheduling opportunities for off-switch assignments
+      // otherwise the app will be delayed for each non-local assignment.
+      // This helps apps with many off-cluster requests schedule faster.
+      if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Resetting scheduling opportunities");
+        }
+        application.resetSchedulingOpportunities(priority);
+      }
+      
+      // Non-exclusive scheduling opportunity is different: we need reset
+      // it every time to make sure non-labeled resource request will be
+      // most likely allocated on non-labeled nodes first.
+      application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
+    }
+
+    return allocationResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba2313d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index c660fcb..d75b2c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,9 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -54,15 +51,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -78,11 +76,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 
-  static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-
-  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-
   private final Set<ContainerId> containersToPreempt =
     new HashSet<ContainerId>();
     
@@ -91,6 +84,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   private ResourceCalculator rc = new DefaultResourceCalculator();
 
   private ResourceScheduler scheduler;
+  
+  private ContainerAllocator containerAllocator;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -124,6 +119,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (scheduler.getResourceCalculator() != null) {
       rc = scheduler.getResourceCalculator();
     }
+    
+    containerAllocator = new RegularContainerAllocator(this, rc, rmContext);
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -386,223 +383,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
   }
 
-  private int getActualNodeLocalityDelay() {
-    return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue()
-        .getNodeLocalityDelay());
-  }
-
-  private boolean canAssign(Priority priority, FiCaSchedulerNode node,
-      NodeType type, RMContainer reservedContainer) {
-
-    // Clearly we need containers for this application...
-    if (type == NodeType.OFF_SWITCH) {
-      if (reservedContainer != null) {
-        return true;
-      }
-
-      // 'Delay' off-switch
-      ResourceRequest offSwitchRequest =
-          getResourceRequest(priority, ResourceRequest.ANY);
-      long missedOpportunities = getSchedulingOpportunities(priority);
-      long requiredContainers = offSwitchRequest.getNumContainers();
-
-      float localityWaitFactor =
-          getLocalityWaitFactor(priority, scheduler.getNumClusterNodes());
-
-      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
-    }
-
-    // Check if we need containers on this rack
-    ResourceRequest rackLocalRequest =
-        getResourceRequest(priority, node.getRackName());
-    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
-      return false;
-    }
-
-    // If we are here, we do need containers on this rack for RACK_LOCAL req
-    if (type == NodeType.RACK_LOCAL) {
-      // 'Delay' rack-local just a little bit...
-      long missedOpportunities = getSchedulingOpportunities(priority);
-      return getActualNodeLocalityDelay() < missedOpportunities;
-    }
-
-    // Check if we need containers on this host
-    if (type == NodeType.NODE_LOCAL) {
-      // Now check if we need containers on this host...
-      ResourceRequest nodeLocalRequest =
-          getResourceRequest(priority, node.getNodeName());
-      if (nodeLocalRequest != null) {
-        return nodeLocalRequest.getNumContainers() > 0;
-      }
-    }
-
-    return false;
-  }
-
-  boolean
-      shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
-    int requiredContainers = getTotalRequiredResources(priority);
-    int reservedContainers = getNumReservedContainers(priority);
-    int starvation = 0;
-    if (reservedContainers > 0) {
-      float nodeFactor =
-          Resources.ratio(
-              rc, required, getCSLeafQueue().getMaximumAllocation()
-              );
-
-      // Use percentage of node required to bias against large containers...
-      // Protect against corner case where you need the whole node with
-      // Math.min(nodeFactor, minimumAllocationFactor)
-      starvation =
-          (int)((getReReservations(priority) / (float)reservedContainers) *
-                (1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor())))
-               );
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("needsContainers:" +
-            " app.#re-reserve=" + getReReservations(priority) +
-            " reserved=" + reservedContainers +
-            " nodeFactor=" + nodeFactor +
-            " minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() +
-            " starvation=" + starvation);
-      }
-    }
-    return (((starvation + requiredContainers) - reservedContainers) > 0);
-  }
-
-  private CSAssignment assignNodeLocalContainers(Resource clusterResource,
-      ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.NODE_LOCAL,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-  }
-
-  private CSAssignment assignRackLocalContainers(Resource clusterResource,
-      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.RACK_LOCAL,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
-  }
-
-  private CSAssignment assignOffSwitchContainers(Resource clusterResource,
-      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.OFF_SWITCH,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
-  }
-
-  private CSAssignment assignContainersOnNode(Resource clusterResource,
-      FiCaSchedulerNode node, Priority priority,
-      RMContainer reservedContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-
-    CSAssignment assigned;
-
-    NodeType requestType = null;
-    MutableObject allocatedContainer = new MutableObject();
-    // Data-local
-    ResourceRequest nodeLocalResourceRequest =
-        getResourceRequest(priority, node.getNodeName());
-    if (nodeLocalResourceRequest != null) {
-      requestType = NodeType.NODE_LOCAL;
-      assigned =
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(rc, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          incNumAllocatedContainers(NodeType.NODE_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.NODE_LOCAL);
-        return assigned;
-      }
-    }
-
-    // Rack-local
-    ResourceRequest rackLocalResourceRequest =
-        getResourceRequest(priority, node.getRackName());
-    if (rackLocalResourceRequest != null) {
-      if (!rackLocalResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-
-      if (requestType != NodeType.NODE_LOCAL) {
-        requestType = NodeType.RACK_LOCAL;
-      }
-
-      assigned =
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(rc, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          incNumAllocatedContainers(NodeType.RACK_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.RACK_LOCAL);
-        return assigned;
-      }
-    }
-
-    // Off-switch
-    ResourceRequest offSwitchResourceRequest =
-        getResourceRequest(priority, ResourceRequest.ANY);
-    if (offSwitchResourceRequest != null) {
-      if (!offSwitchResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-      if (requestType != NodeType.NODE_LOCAL
-          && requestType != NodeType.RACK_LOCAL) {
-        requestType = NodeType.OFF_SWITCH;
-      }
-
-      assigned =
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-
-      // update locality statistics
-      if (allocatedContainer.getValue() != null) {
-        incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
-      }
-      assigned.setType(NodeType.OFF_SWITCH);
-      return assigned;
-    }
-
-    return SKIP_ASSIGNMENT;
-  }
-
   public void reserve(Priority priority,
       FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
     // Update reserved metrics if this is the first reservation
@@ -618,25 +398,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     node.reserveResource(this, priority, rmContainer);
   }
 
-  private Container getContainer(RMContainer rmContainer,
-      FiCaSchedulerNode node, Resource capability, Priority priority) {
-    return (rmContainer != null) ? rmContainer.getContainer()
-        : createContainer(node, capability, priority);
-  }
-
-  Container createContainer(FiCaSchedulerNode node, Resource capability,
-      Priority priority) {
-
-    NodeId nodeId = node.getRMNode().getNodeID();
-    ContainerId containerId =
-        BuilderUtils.newContainerId(getApplicationAttemptId(),
-            getNewContainerId());
-
-    // Create the container
-    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-        .getHttpAddress(), capability, priority, null);
-  }
-
   @VisibleForTesting
   public RMContainer findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, Priority priority,
@@ -672,203 +433,63 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return nodeToUnreserve.getReservedContainer();
   }
 
-  private LeafQueue getCSLeafQueue() {
+  public LeafQueue getCSLeafQueue() {
     return (LeafQueue)queue;
   }
 
-  private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
-      Priority priority,
-      ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " application=" + getApplicationId()
-        + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type);
-    }
-
-    // check if the resource request can access the label
-    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
-        node.getPartition(), schedulingMode)) {
-      // this is a reserved container, but we cannot allocate it now according
-      // to label not match. This can be caused by node label changed
-      // We should un-reserve this container.
-      if (rmContainer != null) {
-        unreserve(priority, node, rmContainer);
-      }
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    Resource capability = request.getCapability();
-    Resource available = node.getAvailableResource();
-    Resource totalResource = node.getTotalResource();
-
-    if (!Resources.lessThanOrEqual(rc, clusterResource,
-        capability, totalResource)) {
-      LOG.warn("Node : " + node.getNodeID()
-          + " does not have sufficient resource for request : " + request
-          + " node total capability : " + node.getTotalResource());
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    assert Resources.greaterThan(
-        rc, clusterResource, available, Resources.none());
-
-    // Create the container if necessary
-    Container container =
-        getContainer(rmContainer, node, capability, priority);
-
-    // something went wrong getting/creating the container
-    if (container == null) {
-      LOG.warn("Couldn't get container for allocation!");
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
-        priority, capability);
-
-    // Can we allocate a container on this node?
-    int availableContainers =
-        rc.computeAvailableContainers(available, capability);
-
-    // How much need to unreserve equals to:
-    // max(required - headroom, amountNeedUnreserve)
-    Resource resourceNeedToUnReserve =
-        Resources.max(rc, clusterResource,
-            Resources.subtract(capability, currentResoureLimits.getHeadroom()),
-            currentResoureLimits.getAmountNeededUnreserve());
-
-    boolean needToUnreserve =
-        Resources.greaterThan(rc, clusterResource,
-            resourceNeedToUnReserve, Resources.none());
-
-    RMContainer unreservedContainer = null;
-    boolean reservationsContinueLooking =
-        getCSLeafQueue().getReservationContinueLooking();
-
-    if (availableContainers > 0) {
-      // Allocate...
-
-      // Did we previously reserve containers at this 'priority'?
-      if (rmContainer != null) {
-        unreserve(priority, node, rmContainer);
-      } else if (reservationsContinueLooking && node.getLabels().isEmpty()) {
-        // when reservationsContinueLooking is set, we may need to unreserve
-        // some containers to meet this queue, its parents', or the users' resource limits.
-        // TODO, need change here when we want to support continuous reservation
-        // looking for labeled partitions.
-        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
-          if (!needToUnreserve) {
-            // If we shouldn't allocate/reserve new container then we should
-            // unreserve one the same size we are asking for since the
-            // currentResoureLimits.getAmountNeededUnreserve could be zero. If
-            // the limit was hit then use the amount we need to unreserve to be
-            // under the limit.
-            resourceNeedToUnReserve = capability;
-          }
-          unreservedContainer =
-              findNodeToUnreserve(clusterResource, node, priority,
-                  resourceNeedToUnReserve);
-          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
-          // container (That means we *have to* unreserve some resource to
-          // continue)). If we failed to unreserve some resource, we can't continue.
-          if (null == unreservedContainer) {
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-      }
-
-      // Inform the application
-      RMContainer allocatedContainer =
-          allocate(type, node, priority, request, container);
-
-      // Does the application need this resource?
-      if (allocatedContainer == null) {
-        CSAssignment csAssignment =  new CSAssignment(Resources.none(), type);
-        csAssignment.setApplication(this);
-        csAssignment.setExcessReservation(unreservedContainer);
-        return csAssignment;
-      }
-
-      // Inform the node
-      node.allocateContainer(allocatedContainer);
-
-      // Inform the ordering policy
-      getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
-          allocatedContainer);
-
-      LOG.info("assignedContainer" +
-          " application attempt=" + getApplicationAttemptId() +
-          " container=" + container +
-          " queue=" + this +
-          " clusterResource=" + clusterResource);
-      createdContainer.setValue(allocatedContainer);
-      CSAssignment assignment = new CSAssignment(container.getResource(), type);
-      assignment.getAssignmentInformation().addAllocationDetails(
-        container.getId(), getCSLeafQueue().getQueuePath());
-      assignment.getAssignmentInformation().incrAllocations();
-      assignment.setApplication(this);
-      Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-        container.getResource());
-
-      assignment.setExcessReservation(unreservedContainer);
-      return assignment;
-    } else {
-      // if we are allowed to allocate but this node doesn't have space, reserve it or
-      // if this was an already a reserved container, reserve it again
-      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
-
-        if (reservationsContinueLooking && rmContainer == null) {
-          // we could possibly ignoring queue capacity or user limits when
-          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
-          // one.
-          if (needToUnreserve) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("we needed to unreserve to be able to allocate");
-            }
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-
-        // Reserve by 'charging' in advance...
-        reserve(priority, node, rmContainer, container);
-
-        LOG.info("Reserved container " +
-            " application=" + getApplicationId() +
-            " resource=" + request.getCapability() +
-            " queue=" + this.toString() +
-            " cluster=" + clusterResource);
-        CSAssignment assignment =
-            new CSAssignment(request.getCapability(), type);
+  private CSAssignment getCSAssignmentFromAllocateResult(
+      Resource clusterResource, ContainerAllocation result) {
+    // Handle skipped
+    boolean skipped =
+        (result.getAllocationState() == AllocationState.APP_SKIPPED);
+    CSAssignment assignment = new CSAssignment(skipped);
+    assignment.setApplication(this);
+    
+    // Handle excess reservation
+    assignment.setExcessReservation(result.getContainerToBeUnreserved());
+
+    // If we allocated something
+    if (Resources.greaterThan(rc, clusterResource,
+        result.getResourceToBeAllocated(), Resources.none())) {
+      Resource allocatedResource = result.getResourceToBeAllocated();
+      Container updatedContainer = result.getUpdatedContainer();
+      
+      assignment.setResource(allocatedResource);
+      assignment.setType(result.getContainerNodeType());
+
+      if (result.getAllocationState() == AllocationState.RESERVED) {
+        // This is a reserved container
+        LOG.info("Reserved container " + " application=" + getApplicationId()
+            + " resource=" + allocatedResource + " queue="
+            + this.toString() + " cluster=" + clusterResource);
         assignment.getAssignmentInformation().addReservationDetails(
-          container.getId(), getCSLeafQueue().getQueuePath());
+            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
         assignment.getAssignmentInformation().incrReservations();
         Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-          request.getCapability());
-        return assignment;
+            allocatedResource);
+        assignment.setFulfilledReservation(true);
+      } else {
+        // This is a new container
+        // Inform the ordering policy
+        LOG.info("assignedContainer" + " application attempt="
+            + getApplicationAttemptId() + " container="
+            + updatedContainer.getId() + " queue=" + this + " clusterResource="
+            + clusterResource);
+
+        getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
+            getRMContainer(updatedContainer.getId()));
+
+        assignment.getAssignmentInformation().addAllocationDetails(
+            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
+        assignment.getAssignmentInformation().incrAllocations();
+        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+            allocatedResource);
       }
-      return new CSAssignment(Resources.none(), type);
     }
+    
+    return assignment;
   }
-
-  private boolean checkHeadroom(Resource clusterResource,
-      ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) {
-    // If headroom + currentReservation < required, we cannot allocate this
-    // require
-    Resource resourceCouldBeUnReserved = getCurrentReservation();
-    if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
-      // If we don't allow reservation continuous looking, OR we're looking at
-      // non-default node partition, we won't allow to unreserve before
-      // allocation.
-      resourceCouldBeUnReserved = Resources.none();
-    }
-    return Resources
-        .greaterThanOrEqual(rc, clusterResource, Resources.add(
-            currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
-            required);
-  }
-
+  
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode) {
@@ -886,174 +507,41 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-label=" + node.getPartition());
       }
-      return SKIP_ASSIGNMENT;
+      return CSAssignment.SKIP_ASSIGNMENT;
     }
 
     synchronized (this) {
-      // Check if this resource is on the blacklist
-      if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) {
-        return SKIP_ASSIGNMENT;
-      }
-
       // Schedule in priority order
       for (Priority priority : getPriorities()) {
-        ResourceRequest anyRequest =
-            getResourceRequest(priority, ResourceRequest.ANY);
-        if (null == anyRequest) {
-          continue;
-        }
-
-        // Required resource
-        Resource required = anyRequest.getCapability();
-
-        // Do we need containers at this 'priority'?
-        if (getTotalRequiredResources(priority) <= 0) {
-          continue;
-        }
-
-        // AM container allocation doesn't support non-exclusive allocation to
-        // avoid painful of preempt an AM container
-        if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+        ContainerAllocation allocationResult =
+            containerAllocator.allocate(clusterResource, node,
+                schedulingMode, currentResourceLimits, priority, null);
 
-          RMAppAttempt rmAppAttempt =
-              rmContext.getRMApps()
-                  .get(getApplicationId()).getCurrentAppAttempt();
-          if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
-              && null == rmAppAttempt.getMasterContainer()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skip allocating AM container to app_attempt="
-                  + getApplicationAttemptId()
-                  + ", don't allow to allocate AM container in non-exclusive mode");
-            }
-            break;
-          }
-        }
+        // If it's a skipped allocation
+        AllocationState allocationState = allocationResult.getAllocationState();
 
-        // Is the node-label-expression of this offswitch resource request
-        // matches the node's label?
-        // If not match, jump to next priority.
-        if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-            anyRequest, node.getPartition(), schedulingMode)) {
+        if (allocationState == AllocationState.PRIORITY_SKIPPED) {
           continue;
         }
-
-        if (!getCSLeafQueue().getReservationContinueLooking()) {
-          if (!shouldAllocOrReserveNewContainer(priority, required)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("doesn't need containers based on reservation algo!");
-            }
-            continue;
-          }
-        }
-
-        if (!checkHeadroom(clusterResource, currentResourceLimits, required,
-            node)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("cannot allocate required resource=" + required
-                + " because of headroom");
-          }
-          return NULL_ASSIGNMENT;
-        }
-
-        // Inform the application it is about to get a scheduling opportunity
-        addSchedulingOpportunity(priority);
-
-        // Increase missed-non-partitioned-resource-request-opportunity.
-        // This is to make sure non-partitioned-resource-request will prefer
-        // to be allocated to non-partitioned nodes
-        int missedNonPartitionedRequestSchedulingOpportunity = 0;
-        if (anyRequest.getNodeLabelExpression().equals(
-            RMNodeLabelsManager.NO_LABEL)) {
-          missedNonPartitionedRequestSchedulingOpportunity =
-              addMissedNonPartitionedRequestSchedulingOpportunity(priority);
-        }
-
-        if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-          // Before doing allocation, we need to check scheduling opportunity to
-          // make sure : non-partitioned resource request should be scheduled to
-          // non-partitioned partition first.
-          if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
-              .getScheduler().getNumClusterNodes()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skip app_attempt="
-                  + getApplicationAttemptId() + " priority="
-                  + priority
-                  + " because missed-non-partitioned-resource-request"
-                  + " opportunity under requred:" + " Now="
-                  + missedNonPartitionedRequestSchedulingOpportunity
-                  + " required="
-                  + rmContext.getScheduler().getNumClusterNodes());
-            }
-
-            return SKIP_ASSIGNMENT;
-          }
-        }
-
-        // Try to schedule
-        CSAssignment assignment =
-            assignContainersOnNode(clusterResource, node,
-                priority, null, schedulingMode, currentResourceLimits);
-
-        // Did the application skip this node?
-        if (assignment.getSkipped()) {
-          // Don't count 'skipped nodes' as a scheduling opportunity!
-          subtractSchedulingOpportunity(priority);
-          continue;
-        }
-
-        // Did we schedule or reserve a container?
-        Resource assigned = assignment.getResource();
-        if (Resources.greaterThan(rc, clusterResource,
-            assigned, Resources.none())) {
-          // Don't reset scheduling opportunities for offswitch assignments
-          // otherwise the app will be delayed for each non-local assignment.
-          // This helps apps with many off-cluster requests schedule faster.
-          if (assignment.getType() != NodeType.OFF_SWITCH) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Resetting scheduling opportunities");
-            }
-            resetSchedulingOpportunities(priority);
-          }
-          // Non-exclusive scheduling opportunity is different: we need reset
-          // it every time to make sure non-labeled resource request will be
-          // most likely allocated on non-labeled nodes first.
-          resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
-
-          // Done
-          return assignment;
-        } else {
-          // Do not assign out of order w.r.t priorities
-          return SKIP_ASSIGNMENT;
-        }
+        return getCSAssignmentFromAllocateResult(clusterResource,
+            allocationResult);
       }
     }
 
-    return SKIP_ASSIGNMENT;
+    // We will reach here if we skipped all priorities of the app, so we will
+    // skip the app.
+    return CSAssignment.SKIP_ASSIGNMENT;
   }
 
 
   public synchronized CSAssignment assignReservedContainer(
       FiCaSchedulerNode node, RMContainer rmContainer,
       Resource clusterResource, SchedulingMode schedulingMode) {
-    // Do we still need this reservation?
-    Priority priority = rmContainer.getReservedPriority();
-    if (getTotalRequiredResources(priority) == 0) {
-      // Release
-      return new CSAssignment(this, rmContainer);
-    }
+    ContainerAllocation result =
+        containerAllocator.allocate(clusterResource, node,
+            schedulingMode, new ResourceLimits(Resources.none()),
+            rmContainer.getReservedPriority(), rmContainer);
 
-    // Try to assign if we have sufficient resources
-    CSAssignment tmp =
-        assignContainersOnNode(clusterResource, node, priority,
-          rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
-
-    // Doesn't matter... since it's already charged for at time of reservation
-    // "re-reservation" is *free*
-    CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-    if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
-      ret.setFulfilledReservation(true);
-    }
-    return ret;
+    return getCSAssignmentFromAllocateResult(clusterResource, result);
   }
-
 }


Mime
View raw message