hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkarana...@apache.org
Subject [2/2] hadoop git commit: YARN-7437. Rename PlacementSet and SchedulingPlacementSet. (Wangda Tan via kkaranasos)
Date Thu, 09 Nov 2017 21:04:02 GMT
YARN-7437. Rename PlacementSet and SchedulingPlacementSet. (Wangda Tan via kkaranasos)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac4d2b10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac4d2b10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac4d2b10

Branch: refs/heads/apache-trunk
Commit: ac4d2b1081d8836a21bc70e77f4e6cd2071a9949
Parents: a2c150a
Author: Konstantinos Karanasos <kkaranasos@apache.org>
Authored: Thu Nov 9 13:01:14 2017 -0800
Committer: Konstantinos Karanasos <kkaranasos@apache.org>
Committed: Thu Nov 9 13:01:24 2017 -0800

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            |  93 ++--
 .../scheduler/ContainerUpdateContext.java       |  15 +-
 .../scheduler/SchedulerApplicationAttempt.java  |  27 +-
 .../scheduler/activities/ActivitiesLogger.java  |   2 +-
 .../scheduler/capacity/AbstractCSQueue.java     |   4 +-
 .../scheduler/capacity/CSQueue.java             |  12 +-
 .../scheduler/capacity/CapacityScheduler.java   |  69 +--
 .../scheduler/capacity/LeafQueue.java           |  39 +-
 .../scheduler/capacity/ParentQueue.java         |  37 +-
 .../allocator/AbstractContainerAllocator.java   |  10 +-
 .../capacity/allocator/ContainerAllocator.java  |  11 +-
 .../allocator/RegularContainerAllocator.java    |  57 +--
 .../scheduler/common/fica/FiCaSchedulerApp.java |  23 +-
 .../scheduler/fair/FSAppAttempt.java            |   2 +-
 .../placement/AppPlacementAllocator.java        | 163 +++++++
 .../scheduler/placement/CandidateNodeSet.java   |  61 +++
 .../placement/CandidateNodeSetUtils.java        |  44 ++
 .../LocalityAppPlacementAllocator.java          | 422 +++++++++++++++++++
 .../LocalitySchedulingPlacementSet.java         | 416 ------------------
 .../scheduler/placement/PlacementSet.java       |  65 ---
 .../scheduler/placement/PlacementSetUtils.java  |  36 --
 .../placement/SchedulingPlacementSet.java       | 158 -------
 .../placement/SimpleCandidateNodeSet.java       |  68 +++
 .../scheduler/placement/SimplePlacementSet.java |  70 ---
 .../scheduler/placement/package-info.java       |  28 ++
 .../capacity/TestCapacityScheduler.java         |   5 +-
 .../scheduler/capacity/TestChildQueueOrder.java |  18 +-
 .../capacity/TestContainerResizing.java         |   7 +-
 .../TestNodeLabelContainerAllocation.java       |   2 +-
 .../scheduler/capacity/TestParentQueue.java     | 119 +++---
 30 files changed, 1082 insertions(+), 1001 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 082ec14..9f49880 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 /**
@@ -82,8 +82,8 @@ public class AppSchedulingInfo {
 
   private final ConcurrentSkipListSet<SchedulerRequestKey>
       schedulerKeys = new ConcurrentSkipListSet<>();
-  final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
-      schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
+  private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>>
+      schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>();
 
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -146,7 +146,7 @@ public class AppSchedulingInfo {
    */
   private void clearRequests() {
     schedulerKeys.clear();
-    schedulerKeyToPlacementSets.clear();
+    schedulerKeyToAppPlacementAllocator.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
@@ -190,9 +190,9 @@ public class AppSchedulingInfo {
         dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
       }
 
-      // Update scheduling placement set
+      // Update AppPlacementAllocator by dedup requests.
       offswitchResourcesUpdated =
-          addToPlacementSets(
+          addRequestToAppPlacement(
               recoverPreemptedRequestForAContainer, dedupRequests);
 
       return offswitchResourcesUpdated;
@@ -201,11 +201,11 @@ public class AppSchedulingInfo {
     }
   }
 
-  public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
-    schedulerKeyToPlacementSets.remove(schedulerRequestKey);
+  public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
+    schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
   }
 
-  boolean addToPlacementSets(
+  boolean addRequestToAppPlacement(
       boolean recoverPreemptedRequestForAContainer,
       Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
     boolean offswitchResourcesUpdated = false;
@@ -213,14 +213,15 @@ public class AppSchedulingInfo {
         dedupRequests.entrySet()) {
       SchedulerRequestKey schedulerRequestKey = entry.getKey();
 
-      if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
-        schedulerKeyToPlacementSets.put(schedulerRequestKey,
-            new LocalitySchedulingPlacementSet<>(this));
+      if (!schedulerKeyToAppPlacementAllocator.containsKey(
+          schedulerRequestKey)) {
+        schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
+            new LocalityAppPlacementAllocator<>(this));
       }
 
-      // Update placement set
+      // Update AppPlacementAllocator
       ResourceRequestUpdateResult pendingAmountChanges =
-          schedulerKeyToPlacementSets.get(schedulerRequestKey)
+          schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
               .updateResourceRequests(
                   entry.getValue().values(),
                   recoverPreemptedRequestForAContainer);
@@ -244,7 +245,7 @@ public class AppSchedulingInfo {
     if (request.getNumContainers() <= 0) {
       if (lastRequestContainers >= 0) {
         schedulerKeys.remove(schedulerKey);
-        schedulerKeyToPlacementSets.remove(schedulerKey);
+        schedulerKeyToAppPlacementAllocator.remove(schedulerKey);
       }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
@@ -356,8 +357,9 @@ public class AppSchedulingInfo {
     List<ResourceRequest> ret = new ArrayList<>();
     try {
       this.readLock.lock();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        ret.addAll(ps.getResourceRequests().values());
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        ret.addAll(ap.getResourceRequests().values());
       }
     } finally {
       this.readLock.unlock();
@@ -384,8 +386,9 @@ public class AppSchedulingInfo {
       String resourceName) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName);
+      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
+          schedulerKey);
+      return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
     } finally {
       this.readLock.unlock();
     }
@@ -424,7 +427,7 @@ public class AppSchedulingInfo {
         updateMetricsForAllocatedContainer(type, node, containerAllocated);
       }
 
-      return schedulerKeyToPlacementSets.get(schedulerKey).allocate(
+      return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate(
           schedulerKey, type, node);
     } finally {
       writeLock.unlock();
@@ -442,23 +445,24 @@ public class AppSchedulingInfo {
       this.writeLock.lock();
       QueueMetrics oldMetrics = queue.getMetrics();
       QueueMetrics newMetrics = newQueue.getMetrics();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
         if (ask.getCount() > 0) {
           oldMetrics.decrPendingResources(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
           newMetrics.incrPendingResources(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
 
           Resource delta = Resources.multiply(ask.getPerAllocationResource(),
               ask.getCount());
           // Update Queue
           queue.decPendingResource(
-              ps.getPrimaryRequestedNodePartition(), delta);
+              ap.getPrimaryRequestedNodePartition(), delta);
           newQueue.incPendingResource(
-              ps.getPrimaryRequestedNodePartition(), delta);
+              ap.getPrimaryRequestedNodePartition(), delta);
         }
       }
       oldMetrics.moveAppFrom(this);
@@ -477,15 +481,16 @@ public class AppSchedulingInfo {
     try {
       this.writeLock.lock();
       QueueMetrics metrics = queue.getMetrics();
-      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
-        PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
+      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
+          .values()) {
+        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
         if (ask.getCount() > 0) {
-          metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(),
+          metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(),
               user, ask.getCount(), ask.getPerAllocationResource());
 
           // Update Queue
           queue.decPendingResource(
-              ps.getPrimaryRequestedNodePartition(),
+              ap.getPrimaryRequestedNodePartition(),
               Resources.multiply(ask.getPerAllocationResource(),
                   ask.getCount()));
         }
@@ -559,11 +564,12 @@ public class AppSchedulingInfo {
       SchedulerRequestKey schedulerKey) {
     try {
       readLock.lock();
-      SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
-      if (null == ps) {
+      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
+          schedulerKey);
+      if (null == ap) {
         return false;
       }
-      return ps.canAllocate(type, node);
+      return ap.canAllocate(type, node);
     } finally {
       readLock.unlock();
     }
@@ -593,11 +599,10 @@ public class AppSchedulingInfo {
     metrics.incrNodeTypeAggregations(user, type);
   }
 
-  // Get placement-set by specified schedulerKey
-  // Now simply return all node of the input clusterPlacementSet
-  public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
+  // Get AppPlacementAllocator by specified schedulerKey
+  public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
       SchedulerRequestKey schedulerkey) {
-    return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
+    return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get(
         schedulerkey);
   }
 
@@ -614,9 +619,9 @@ public class AppSchedulingInfo {
       SchedulerRequestKey schedulerKey, String resourceName) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps =
-          schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps == null) || ps.canDelayTo(resourceName);
+      AppPlacementAllocator ap =
+          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
+      return (ap == null) || ap.canDelayTo(resourceName);
     } finally {
       this.readLock.unlock();
     }
@@ -626,9 +631,9 @@ public class AppSchedulingInfo {
       String nodePartition, SchedulingMode schedulingMode) {
     try {
       this.readLock.lock();
-      SchedulingPlacementSet ps =
-          schedulerKeyToPlacementSets.get(schedulerKey);
-      return (ps != null) && ps.acceptNodePartition(nodePartition,
+      AppPlacementAllocator ap =
+          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
+      return (ap != null) && ap.acceptNodePartition(nodePartition,
           schedulingMode);
     } finally {
       this.readLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 5ac2ac5..93995a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
     .RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
-    .SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -146,17 +145,17 @@ public class ContainerUpdateContext {
           createResourceRequests(rmContainer, schedulerNode,
               schedulerKey, resToIncrease);
       updateResReqs.put(schedulerKey, resMap);
-      appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+      appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs);
     }
     return true;
   }
 
   private void cancelPreviousRequest(SchedulerNode schedulerNode,
       SchedulerRequestKey schedulerKey) {
-    SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
-        appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
-    if (schedulingPlacementSet != null) {
-      Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+    AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
+        appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
+    if (appPlacementAllocator != null) {
+      Map<String, ResourceRequest> resourceRequests = appPlacementAllocator
           .getResourceRequests();
       ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
       // Decrement the pending using a dummy RR with
@@ -290,7 +289,7 @@ public class ContainerUpdateContext {
           (rmContainer, node, schedulerKey,
           rmContainer.getContainer().getResource());
       reqsToUpdate.put(schedulerKey, resMap);
-      appSchedulingInfo.addToPlacementSets(true, reqsToUpdate);
+      appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate);
       return UNDEFINED;
     }
     return retVal;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index ce71afa..346bd20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.collect.ConcurrentHashMultiset;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang.time.FastDateFormat;
@@ -75,14 +74,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -91,6 +87,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ConcurrentHashMultiset;
 
 /**
  * Represents an application attempt from the viewpoint of the scheduler.
@@ -316,9 +313,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       String resourceName) {
     try {
       readLock.lock();
-      SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet(
+      AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator(
           schedulerKey);
-      return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName);
+      return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName);
     } finally {
       readLock.unlock();
     }
@@ -617,13 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       try {
         readLock.lock();
         for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-          SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey);
-          if (ps != null &&
-              ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
+          AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey);
+          if (ap != null &&
+              ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " headRoom=" + getHeadroom() + " currentConsumption="
                 + attemptResourceUsage.getUsed().getMemorySize());
-            ps.showRequests();
+            ap.showRequests();
           }
         }
       } finally {
@@ -1334,14 +1331,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     this.isAttemptRecovering = isRecovering;
   }
 
-  public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
+  public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
       SchedulerRequestKey schedulerRequestKey) {
-    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
+    return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey);
   }
 
   public Map<String, ResourceRequest> getResourceRequests(
       SchedulerRequestKey schedulerRequestKey) {
-    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey)
+    return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey)
         .getResourceRequests();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 12aff02..0c351b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 /**
  * Utility for logging scheduler activities
  */
-// FIXME: make sure PlacementSet works with this class
+// FIXME: make sure CandidateNodeSet works with this class
 public class ActivitiesLogger {
   private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 250f4e6..183cb36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -876,7 +876,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
       SchedulingMode schedulingMode) {
-    return assignContainers(clusterResource, new SimplePlacementSet<>(node),
+    return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node),
         resourceLimits, schedulingMode);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 3a17d1b..43e7f53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 
 /**
  * <code>CSQueue</code> represents a node in the tree of 
@@ -188,15 +185,16 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
-   * @param ps {@link PlacementSet} of nodes which resources are available
+   * @param candidates {@link CandidateNodeSet} the nodes that are considered
+   *                   for the current placement.
    * @param resourceLimits how much overall resource of this queue can use. 
    * @param schedulingMode Type of exclusive check when assign container on a 
    * NodeManager, see {@link SchedulingMode}.
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
-      SchedulingMode schedulingMode);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits resourceLimits, SchedulingMode schedulingMode);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5e172b8..6f630f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -1183,7 +1183,7 @@ public class CapacityScheduler extends
 
   /**
    * We need to make sure when doing allocation, Node should be existed
-   * And we will construct a {@link PlacementSet} before proceeding
+   * And we will construct a {@link CandidateNodeSet} before proceeding
    */
   private void allocateContainersToNode(NodeId nodeId,
       boolean withNodeHeartbeat) {
@@ -1192,8 +1192,10 @@ public class CapacityScheduler extends
       int offswitchCount = 0;
       int assignedContainers = 0;
 
-      PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
-      CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
+      CandidateNodeSet<FiCaSchedulerNode> candidates =
+          new SimpleCandidateNodeSet<>(node);
+      CSAssignment assignment = allocateContainersToNode(candidates,
+          withNodeHeartbeat);
       // Only check if we can allocate more container on the same node when
       // scheduling is triggered by node heartbeat
       if (null != assignment && withNodeHeartbeat) {
@@ -1210,7 +1212,7 @@ public class CapacityScheduler extends
             assignedContainers)) {
           // Try to see if it is possible to allocate multiple container for
           // the same node heartbeat
-          assignment = allocateContainersToNode(ps, true);
+          assignment = allocateContainersToNode(candidates, true);
 
           if (null != assignment
               && assignment.getType() == NodeType.OFF_SWITCH) {
@@ -1237,8 +1239,9 @@ public class CapacityScheduler extends
   /*
    * Logics of allocate container on a single node (Old behavior)
    */
-  private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps,
-      FiCaSchedulerNode node, boolean withNodeHeartbeat) {
+  private CSAssignment allocateContainerOnSingleNode(
+      CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
+      boolean withNodeHeartbeat) {
     // Backward compatible way to make sure previous behavior which allocation
     // driven by node heartbeat works.
     if (getNode(node.getNodeID()) != node) {
@@ -1262,7 +1265,7 @@ public class CapacityScheduler extends
               .getApplicationId() + " on node: " + node.getNodeID());
 
       LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
-      assignment = queue.assignContainers(getClusterResource(), ps,
+      assignment = queue.assignContainers(getClusterResource(), candidates,
           // TODO, now we only consider limits for parent for non-labeled
           // resources, should consider labeled resources as well.
           new ResourceLimits(labelManager
@@ -1329,14 +1332,16 @@ public class CapacityScheduler extends
               + node.getUnallocatedResource());
     }
 
-    return allocateOrReserveNewContainers(ps, withNodeHeartbeat);
+    return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
   }
 
   private CSAssignment allocateOrReserveNewContainers(
-      PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      boolean withNodeHeartbeat) {
     CSAssignment assignment = getRootQueue().assignContainers(
-        getClusterResource(), ps, new ResourceLimits(labelManager
-            .getResourceByLabel(ps.getPartition(), getClusterResource())),
+        getClusterResource(), candidates, new ResourceLimits(labelManager
+            .getResourceByLabel(candidates.getPartition(),
+                getClusterResource())),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@@ -1346,30 +1351,34 @@ public class CapacityScheduler extends
         assignment.getResource(), Resources.none())) {
       if (withNodeHeartbeat) {
         updateSchedulerHealth(lastNodeUpdateTime,
-            PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment);
+            CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
+            assignment);
       }
       return assignment;
     }
 
     // Only do non-exclusive allocation when node has node-labels.
-    if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) {
+    if (StringUtils.equals(candidates.getPartition(),
+        RMNodeLabelsManager.NO_LABEL)) {
       return null;
     }
 
     // Only do non-exclusive allocation when the node-label supports that
     try {
       if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
-          ps.getPartition())) {
+          candidates.getPartition())) {
         return null;
       }
     } catch (IOException e) {
-      LOG.warn("Exception when trying to get exclusivity of node label=" + ps
+      LOG.warn(
+          "Exception when trying to get exclusivity of node label=" + candidates
           .getPartition(), e);
       return null;
     }
 
     // Try to use NON_EXCLUSIVE
-    assignment = getRootQueue().assignContainers(getClusterResource(), ps,
+    assignment = getRootQueue().assignContainers(getClusterResource(),
+        candidates,
         // TODO, now we only consider limits for parent for non-labeled
         // resources, should consider labeled resources as well.
         new ResourceLimits(labelManager
@@ -1386,13 +1395,14 @@ public class CapacityScheduler extends
    * New behavior, allocate containers considering multiple nodes
    */
   private CSAssignment allocateContainersOnMultiNodes(
-      PlacementSet<FiCaSchedulerNode> ps) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates) {
     // When this time look at multiple nodes, try schedule if the
     // partition has any available resource or killable resource
     if (getRootQueue().getQueueCapacities().getUsedCapacity(
-        ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
-        CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
-        .none()) {
+        candidates.getPartition()) >= 1.0f
+        && preemptionManager.getKillableResource(
+        CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
+        == Resources.none()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("This node or this node partition doesn't have available or"
             + "killable resource");
@@ -1400,11 +1410,12 @@ public class CapacityScheduler extends
       return null;
     }
 
-    return allocateOrReserveNewContainers(ps, false);
+    return allocateOrReserveNewContainers(candidates, false);
   }
 
   @VisibleForTesting
-  CSAssignment allocateContainersToNode(PlacementSet<FiCaSchedulerNode> ps,
+  CSAssignment allocateContainersToNode(
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
       boolean withNodeHeartbeat) {
     if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
         .isSchedulerReadyForAllocatingContainers()) {
@@ -1413,14 +1424,14 @@ public class CapacityScheduler extends
 
     // Backward compatible way to make sure previous behavior which allocation
     // driven by node heartbeat works.
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     // We have two different logics to handle allocation on single node / multi
     // nodes.
     if (null != node) {
-      return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat);
-    } else {
-      return allocateContainersOnMultiNodes(ps);
+      return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
+    } else{
+      return allocateContainersOnMultiNodes(candidates);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f2f1baf..e8342d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -970,10 +970,10 @@ public class LeafQueue extends AbstractCSQueue {
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
   }
 
-  private CSAssignment allocateFromReservedContainer(
-      Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps,
+  private CSAssignment allocateFromReservedContainer(Resource clusterResource,
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
     if (null == node) {
       return null;
     }
@@ -987,7 +987,8 @@ public class LeafQueue extends AbstractCSQueue {
         ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
             node.getNodeID(), SystemClock.getInstance().getTime(), application);
         CSAssignment assignment = application.assignContainers(clusterResource,
-            ps, currentResourceLimits, schedulingMode, reservedContainer);
+            candidates, currentResourceLimits, schedulingMode,
+            reservedContainer);
         return assignment;
       }
     }
@@ -997,43 +998,44 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
-    SchedulingMode schedulingMode) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: partition=" + ps.getPartition()
+      LOG.debug("assignContainers: partition=" + candidates.getPartition()
           + " #applications=" + orderingPolicy.getNumSchedulableEntities());
     }
 
-    setPreemptionAllowed(currentResourceLimits, ps.getPartition());
+    setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
 
     // Check for reserved resources, try to allocate reserved container first.
     CSAssignment assignment = allocateFromReservedContainer(clusterResource,
-        ps, currentResourceLimits, schedulingMode);
+        candidates, currentResourceLimits, schedulingMode);
     if (null != assignment) {
       return assignment;
     }
 
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(ps.getPartition())) {
+        && !accessibleToPartition(candidates.getPartition())) {
       ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
           getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates
               .getPartition());
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
     // Check if this queue need more resource, simply skip allocation if this
     // queue doesn't need more resources.
-    if (!hasPendingResourceRequest(ps.getPartition(), clusterResource,
+    if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
         schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
             + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + ps.getPartition());
+            + schedulingMode.name() + " node-partition=" + candidates
+            .getPartition());
       }
       ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
           getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
@@ -1078,7 +1080,8 @@ public class LeafQueue extends AbstractCSQueue {
         cachedUserLimit = cul.userLimit;
       }
       Resource userLimit = computeUserLimitAndSetHeadroom(application,
-          clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit);
+          clusterResource, candidates.getPartition(), schedulingMode,
+          cachedUserLimit);
       if (cul == null) {
         cul = new CachedUserLimit(userLimit);
         userLimits.put(application.getUser(), cul);
@@ -1106,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Try to schedule
       assignment = application.assignContainers(clusterResource,
-          ps, currentResourceLimits, schedulingMode, null);
+          candidates, currentResourceLimits, schedulingMode, null);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("post-assignContainers for application " + application

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 2c288f2..d61951b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -479,16 +479,16 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
-    SchedulingMode schedulingMode) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      ResourceLimits resourceLimits, SchedulingMode schedulingMode) {
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(ps.getPartition())) {
+        && !accessibleToPartition(candidates.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it is not able to access partition=" + ps
+            + ", because it is not able to access partition=" + candidates
             .getPartition());
       }
 
@@ -506,12 +506,12 @@ public class ParentQueue extends AbstractCSQueue {
 
     // Check if this queue need more resource, simply skip allocation if this
     // queue doesn't need more resources.
-    if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource,
-        schedulingMode)) {
+    if (!super.hasPendingResourceRequest(candidates.getPartition(),
+        clusterResource, schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skip this queue=" + getQueuePath()
             + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + ps
+            + schedulingMode.name() + " node-partition=" + candidates
             .getPartition());
       }
 
@@ -538,7 +538,8 @@ public class ParentQueue extends AbstractCSQueue {
       // Are we over maximum-capacity for this queue?
       // This will also consider parent's limits and also continuous reservation
       // looking
-      if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
+      if (!super.canAssignToThisQueue(clusterResource,
+          candidates.getPartition(),
           resourceLimits, Resources
               .createResource(getMetrics().getReservedMB(),
                   getMetrics().getReservedVirtualCores()), schedulingMode)) {
@@ -556,7 +557,7 @@ public class ParentQueue extends AbstractCSQueue {
 
       // Schedule
       CSAssignment assignedToChild = assignContainersToChildQueues(
-          clusterResource, ps, resourceLimits, schedulingMode);
+          clusterResource, candidates, resourceLimits, schedulingMode);
       assignment.setType(assignedToChild.getType());
       assignment.setRequestLocalityType(
           assignedToChild.getRequestLocalityType());
@@ -710,7 +711,7 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   private CSAssignment assignContainersToChildQueues(Resource cluster,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits,
+      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
       SchedulingMode schedulingMode) {
     CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
 
@@ -719,7 +720,7 @@ public class ParentQueue extends AbstractCSQueue {
 
     // Try to assign to most 'under-served' sub-queue
     for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
-        ps.getPartition()); iter.hasNext(); ) {
+        candidates.getPartition()); iter.hasNext(); ) {
       CSQueue childQueue = iter.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@@ -729,10 +730,10 @@ public class ParentQueue extends AbstractCSQueue {
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, parentLimits,
-              ps.getPartition());
-      
-      CSAssignment childAssignment = childQueue.assignContainers(cluster, ps,
-          childLimits, schedulingMode);
+              candidates.getPartition());
+
+      CSAssignment childAssignment = childQueue.assignContainers(cluster,
+          candidates, childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
             " stats: " + childQueue + " --> " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
index 5809d86..95e0533 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -176,13 +175,14 @@ public abstract class AbstractContainerAllocator {
    * </ul>
    *
    * @param clusterResource clusterResource
-   * @param ps PlacementSet
+   * @param candidates CandidateNodeSet
    * @param schedulingMode scheduling mode (exclusive or nonexclusive)
    * @param resourceLimits resourceLimits
    * @param reservedContainer reservedContainer
    * @return CSAssignemnt proposal
    */
   public abstract CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, RMContainer reservedContainer);
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      RMContainer reservedContainer);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 4879fae..9df03b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerAllocator extends AbstractContainerAllocator {
   private AbstractContainerAllocator regularContainerAllocator;
@@ -50,10 +48,11 @@ public class ContainerAllocator extends AbstractContainerAllocator {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      RMContainer reservedContainer) {
     return regularContainerAllocator.assignContainers(clusterResource,
-        ps, schedulingMode, resourceLimits, reservedContainer);
+        candidates, schedulingMode, resourceLimits, reservedContainer);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 72dfbdd..69e90c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
@@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -91,15 +91,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
   /*
    * Pre-check if we can allocate a pending resource request
-   * (given schedulerKey) to a given PlacementSet.
+   * (given schedulerKey) to a given CandidateNodeSet.
    * We will consider stuffs like exclusivity, pending resource, node partition,
    * headroom, etc.
    */
-  private ContainerAllocation preCheckForPlacementSet(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
+  private ContainerAllocation preCheckForNodeCandidateSet(
+      Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      SchedulerRequestKey schedulerKey) {
     Priority priority = schedulerKey.getPriority();
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
         ResourceRequest.ANY);
@@ -164,7 +165,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     if (!checkHeadroom(clusterResource, resourceLimits, required,
-        ps.getPartition())) {
+        candidates.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
@@ -182,7 +183,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Only do this when request associated with given scheduler key accepts
     // NO_LABEL under RESPECT_EXCLUSIVITY mode
     if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
-        appInfo.getSchedulingPlacementSet(schedulerKey)
+        appInfo.getAppPlacementAllocator(schedulerKey)
             .getPrimaryRequestedNodePartition())) {
       missedNonPartitionedRequestSchedulingOpportunity =
           application.addMissedNonPartitionedRequestSchedulingOpportunity(
@@ -265,7 +266,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = Math.max(
-        application.getSchedulingPlacementSet(schedulerKey)
+        application.getAppPlacementAllocator(schedulerKey)
             .getUniqueLocationAsks() - 1, 0);
     
     // waitFactor can't be more than '1' 
@@ -780,15 +781,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   }
 
   private ContainerAllocation allocate(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
-      RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      SchedulerRequestKey schedulerKey, RMContainer reservedContainer) {
     // Do checks before determining which node to allocate
     // Directly return if this check fails.
     ContainerAllocation result;
     if (reservedContainer == null) {
-      result = preCheckForPlacementSet(clusterResource, ps, schedulingMode,
-          resourceLimits, schedulerKey);
+      result = preCheckForNodeCandidateSet(clusterResource, candidates,
+          schedulingMode, resourceLimits, schedulerKey);
       if (null != result) {
         return result;
       }
@@ -801,14 +802,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     }
 
-    SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
-        application.getAppSchedulingInfo().getSchedulingPlacementSet(
+    AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
+        application.getAppSchedulingInfo().getAppPlacementAllocator(
             schedulerKey);
 
     result = ContainerAllocation.PRIORITY_SKIPPED;
 
     Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
-        ps);
+        candidates);
     while (iter.hasNext()) {
       FiCaSchedulerNode node = iter.next();
 
@@ -827,19 +828,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
   
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits,
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
       RMContainer reservedContainer) {
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     if (reservedContainer == null) {
       // Check if application needs more resource, skip if it doesn't need more.
       if (!application.hasPendingResourceRequest(rc,
-          ps.getPartition(), clusterResource, schedulingMode)) {
+          candidates.getPartition(), clusterResource, schedulingMode)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
               + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-label=" + ps.getPartition());
+              + schedulingMode.name() + " node-label=" + candidates
+              .getPartition());
         }
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
             activitiesManager, node, application, application.getPriority(),
@@ -849,9 +851,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       
       // Schedule in priority order
       for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
-        ContainerAllocation result =
-            allocate(clusterResource, ps, schedulingMode, resourceLimits,
-                schedulerKey, null);
+        ContainerAllocation result = allocate(clusterResource, candidates,
+            schedulingMode, resourceLimits, schedulerKey, null);
 
         AllocationState allocationState = result.getAllocationState();
         if (allocationState == AllocationState.PRIORITY_SKIPPED) {
@@ -869,7 +870,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       return CSAssignment.SKIP_ASSIGNMENT;
     } else {
       ContainerAllocation result =
-          allocate(clusterResource, ps, schedulingMode, resourceLimits,
+          allocate(clusterResource, candidates, schedulingMode, resourceLimits,
               reservedContainer.getReservedSchedulerKey(), reservedContainer);
       return getCSAssignmentFromAllocateResult(clusterResource, result,
           reservedContainer, node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index a12c5ec..40405fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -224,10 +224,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         return null;
       }
 
-      SchedulingPlacementSet<FiCaSchedulerNode> ps =
-          appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+      AppPlacementAllocator<FiCaSchedulerNode> ps =
+          appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
       if (null == ps) {
-        LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+        LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
             + " for application=" + getApplicationId() + " schedulerRequestKey="
             + schedulerKey);
         return null;
@@ -636,8 +636,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       Map<String, Resource> ret = new HashMap<>();
       for (SchedulerRequestKey schedulerKey : appSchedulingInfo
           .getSchedulerKeys()) {
-        SchedulingPlacementSet<FiCaSchedulerNode> ps =
-            appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+        AppPlacementAllocator<FiCaSchedulerNode> ps =
+            appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
 
         String nodePartition = ps.getPrimaryRequestedNodePartition();
         Resource res = ret.get(nodePartition);
@@ -844,8 +844,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
-      SchedulingMode schedulingMode, RMContainer reservedContainer) {
+      CandidateNodeSet<FiCaSchedulerNode> ps,
+      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode,
+      RMContainer reservedContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("pre-assignContainers for application "
           + getApplicationId());
@@ -962,9 +963,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   @Override
   @SuppressWarnings("unchecked")
-  public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet(
+  public AppPlacementAllocator<FiCaSchedulerNode> getAppPlacementAllocator(
       SchedulerRequestKey schedulerRequestKey) {
-    return super.getSchedulingPlacementSet(schedulerRequestKey);
+    return super.getAppPlacementAllocator(schedulerRequestKey);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 157d264..bbd4418 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -1019,7 +1019,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         }
 
         if (offswitchAsk.getCount() > 0) {
-          if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks()
+          if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
               <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Assign container on " + node.getNodeName()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
new file mode 100644
index 0000000..63b22a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ * This class has the following functionality:
+ * 1) Keeps track of pending resource requests when following events happen:
+ * - New ResourceRequests are added to scheduler.
+ * - New containers get allocated.
+ *
+ * 2) Determines the order that the nodes given in the {@link CandidateNodeSet}
+ * will be used for allocating containers.
+ * </p>
+ *
+ * <p>
+ * And different set of resource requests (E.g., resource requests with the
+ * same schedulerKey) can have one instance of AppPlacementAllocator, each
+ * AppPlacementAllocator can have different ways to order nodes depends on
+ * requests.
+ * </p>
+ */
+public interface AppPlacementAllocator<N extends SchedulerNode> {
+  /**
+   * Get iterator of preferred node depends on requirement and/or availability
+   * @param candidateNodeSet input CandidateNodeSet
+   * @return iterator of preferred node
+   */
+  Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
+
+  /**
+   * Replace existing ResourceRequest by the new requests
+   *
+   * @param requests new ResourceRequests
+   * @param recoverPreemptedRequestForAContainer if we're recovering resource
+   * requests for preempted container
+   * @return true if total pending resource changed
+   */
+  ResourceRequestUpdateResult updateResourceRequests(
+      Collection<ResourceRequest> requests,
+      boolean recoverPreemptedRequestForAContainer);
+
+  /**
+   * Get pending ResourceRequests by given schedulerRequestKey
+   * @return Map of resourceName to ResourceRequest
+   */
+  Map<String, ResourceRequest> getResourceRequests();
+
+  /**
+   * Get pending ask for given resourceName. If there's no such pendingAsk,
+   * returns {@link PendingAsk#ZERO}
+   *
+   * @param resourceName resourceName
+   * @return PendingAsk
+   */
+  PendingAsk getPendingAsk(String resourceName);
+
+  /**
+   * Get #pending-allocations for given resourceName. If there's no such
+   * pendingAsk, returns 0
+   *
+   * @param resourceName resourceName
+   * @return #pending-allocations
+   */
+  int getOutstandingAsksCount(String resourceName);
+
+  /**
+   * Notify container allocated.
+   * @param schedulerKey SchedulerRequestKey for this ResourceRequest
+   * @param type Type of the allocation
+   * @param node Which node this container allocated on
+   * @return list of ResourceRequests deducted
+   */
+  List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
+      NodeType type, SchedulerNode node);
+
+  /**
+   * Returns list of accepted resourceNames.
+   * @return Iterator of accepted resourceNames
+   */
+  Iterator<String> getAcceptedResouceNames();
+
+  /**
+   * We can still have pending requirement for a given NodeType and node
+   * @param type Locality Type
+   * @param node which node we will allocate on
+   * @return true if we has pending requirement
+   */
+  boolean canAllocate(NodeType type, SchedulerNode node);
+
+  /**
+   * Can delay to give locality?
+   * TODO: This should be moved out of AppPlacementAllocator
+   * and should belong to specific delay scheduling policy impl.
+   * See YARN-7457 for more details.
+   *
+   * @param resourceName resourceName
+   * @return can/cannot
+   */
+  boolean canDelayTo(String resourceName);
+
+  /**
+   * Does this {@link AppPlacementAllocator} accept resources on nodePartition?
+   *
+   * @param nodePartition nodePartition
+   * @param schedulingMode schedulingMode
+   * @return accepted/not
+   */
+  boolean acceptNodePartition(String nodePartition,
+      SchedulingMode schedulingMode);
+
+  /**
+   * It is possible that one request can accept multiple node partition,
+   * So this method returns primary node partition for pending resource /
+   * headroom calculation.
+   *
+   * @return primary requested node partition
+   */
+  String getPrimaryRequestedNodePartition();
+
+  /**
+   * @return number of unique location asks with #pending greater than 0,
+   * (like /rack1, host1, etc.).
+   *
+   * TODO: This should be moved out of AppPlacementAllocator
+   * and should belong to specific delay scheduling policy impl.
+   * See YARN-7457 for more details.
+   */
+  int getUniqueLocationAsks();
+
+  /**
+   * Print human-readable requests to LOG debug.
+   */
+  void showRequests();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java
new file mode 100644
index 0000000..6f127c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Map;
+
+/**
+ * A group of nodes which can be allocated by scheduler.
+ *
+ * It will have following part:
+ *
+ * 1) A map of nodes which can be schedule-able.
+ * 2) Version of the node set, version should be updated if any node added or
+ *    removed from the node set. This will be used by
+ *    {@link AppPlacementAllocator} or other class to check if it's required to
+ *    invalidate local caches, etc.
+ * 3) Node partition of the candidate set.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface CandidateNodeSet<N extends SchedulerNode> {
+  /**
+   * Get all nodes for this CandidateNodeSet
+   * @return all nodes for this CandidateNodeSet
+   */
+  Map<NodeId, N> getAllNodes();
+
+  /**
+   * Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to
+   * decide if update is required
+   * @return version
+   */
+  long getVersion();
+
+  /**
+   * Node partition of the node set.
+   * @return node partition
+   */
+  String getPartition();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java
new file mode 100644
index 0000000..157d2ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Utility methods for {@link CandidateNodeSet}.
+ */
+public final class CandidateNodeSetUtils {
+
+  private CandidateNodeSetUtils() {
+  }
+
+  /*
+   * If the {@link CandidateNodeSet} only has one entry, return it. Otherwise,
+   * return null.
+   */
+  public static <N extends SchedulerNode> N getSingleNode(
+      CandidateNodeSet<N> candidates) {
+    N node = null;
+    if (1 == candidates.getAllNodes().size()) {
+      node = candidates.getAllNodes().values().iterator().next();
+    }
+
+    return node;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message