hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [04/50] [abbrv] hadoop git commit: YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan
Date Fri, 01 May 2015 07:01:21 GMT
YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d497f6ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d497f6ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d497f6ea

Branch: refs/heads/HDFS-7240
Commit: d497f6ea2be559aa31ed76f37ae949dbfabe2a51
Parents: dcc5455
Author: Jian He <jianhe@apache.org>
Authored: Fri Apr 24 17:03:13 2015 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Fri Apr 24 17:03:13 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |    3 +
 .../ProportionalCapacityPreemptionPolicy.java   |  585 +++++----
 .../rmcontainer/RMContainerImpl.java            |   28 +-
 .../scheduler/capacity/CapacityScheduler.java   |    2 +-
 .../scheduler/capacity/LeafQueue.java           |   70 +-
 .../scheduler/common/AssignmentInformation.java |   31 +-
 ...estProportionalCapacityPreemptionPolicy.java |   94 +-
 ...pacityPreemptionPolicyForNodePartitions.java | 1211 ++++++++++++++++++
 .../scheduler/capacity/TestChildQueueOrder.java |    2 +-
 .../scheduler/capacity/TestLeafQueue.java       |    4 +-
 .../TestNodeLabelContainerAllocation.java       |   16 +
 .../scheduler/capacity/TestParentQueue.java     |    2 +-
 12 files changed, 1750 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 44b87e5..a830771 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -102,6 +102,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
 
+    YARN-2498. Respect labels in preemption policy of capacity scheduler for
+    inter-queue preemption. (Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 2ab4197..1f47b5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,11 +27,10 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.TreeSet;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 
 /**
  * This class implement a {@link SchedulingEditPolicy} that is designed to be
@@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private float percentageClusterPreemptionAllowed;
   private double naturalTerminationFactor;
   private boolean observeOnly;
-  private Map<NodeId, Set<String>> labels;
+  private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
+      new HashMap<>();
+  private RMNodeLabelsManager nlm;
 
   public ProportionalCapacityPreemptionPolicy() {
     clock = new SystemClock();
@@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
-    labels = null;
+    nlm = scheduler.getRMContext().getNodeLabelManager();
   }
   
   @VisibleForTesting
@@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public void editSchedule() {
     CSQueue root = scheduler.getRootQueue();
     Resource clusterResources = Resources.clone(scheduler.getClusterResource());
-    clusterResources = getNonLabeledResources(clusterResources);
-    setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
-        .getNodeLabels());
     containerBasedPreemptOrKill(root, clusterResources);
   }
-
-  /**
-   * Setting Node Labels
-   * 
-   * @param nodelabels
-   */
-  public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
-    labels = nodelabels;
-  }
-
-  /**
-   * This method returns all non labeled resources.
-   * 
-   * @param clusterResources
-   * @return Resources
-   */
-  private Resource getNonLabeledResources(Resource clusterResources) {
-    RMContext rmcontext = scheduler.getRMContext();
-    RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
-    Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
-        clusterResources);
-    return res == null ? clusterResources : res;
-  }
   
   /**
    * This method selects and tracks containers to be preempted. If a container
@@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    */
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
+    // All partitions to look at
+    Set<String> allPartitions = new HashSet<>();
+    allPartitions.addAll(scheduler.getRMContext()
+        .getNodeLabelManager().getClusterNodeLabelNames());
+    allPartitions.add(RMNodeLabelsManager.NO_LABEL);
 
     // extract a summary of the queues from scheduler
-    TempQueue tRoot;
     synchronized (scheduler) {
-      tRoot = cloneQueues(root, clusterResources);
+      queueToPartitions.clear();
+
+      for (String partitionToLookAt : allPartitions) {
+        cloneQueues(root,
+            nlm.getResourceByLabel(partitionToLookAt, clusterResources),
+            partitionToLookAt);
+      }
     }
 
-    // compute the ideal distribution of resources among queues
-    // updates cloned queues state accordingly
-    tRoot.idealAssigned = tRoot.guaranteed;
+    // compute total preemption allowed
     Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
         percentageClusterPreemptionAllowed);
-    List<TempQueue> queues =
-      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+
+    Set<String> leafQueueNames = null;
+    for (String partition : allPartitions) {
+      TempQueuePerPartition tRoot =
+          getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+      // compute the ideal distribution of resources among queues
+      // updates cloned queues state accordingly
+      tRoot.idealAssigned = tRoot.guaranteed;
+
+      leafQueueNames =
+          recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+    }
 
     // based on ideal allocation select containers to be preempted from each
     // queue and each application
     Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
-        getContainersToPreempt(queues, clusterResources);
+        getContainersToPreempt(leafQueueNames, clusterResources);
 
     if (LOG.isDebugEnabled()) {
-      logToCSV(queues);
+      logToCSV(new ArrayList<String>(leafQueueNames));
     }
 
     // if we are in observeOnly mode return before any action is taken
@@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
          : toPreempt.entrySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Send to scheduler: in app=" + e.getKey()
+            + " #containers-to-be-preempted=" + e.getValue().size());
+      }
       for (RMContainer container : e.getValue()) {
         // if we tried to preempt this for more than maxWaitTime
         if (preempted.get(container) != null &&
@@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param totalPreemptionAllowed maximum amount of preemption allowed
    * @return a list of leaf queues updated with preemption targets
    */
-  private List<TempQueue> recursivelyComputeIdealAssignment(
-      TempQueue root, Resource totalPreemptionAllowed) {
-    List<TempQueue> leafs = new ArrayList<TempQueue>();
+  private Set<String> recursivelyComputeIdealAssignment(
+      TempQueuePerPartition root, Resource totalPreemptionAllowed) {
+    Set<String> leafQueueNames = new HashSet<>();
     if (root.getChildren() != null &&
         root.getChildren().size() > 0) {
       // compute ideal distribution at this level
       computeIdealResourceDistribution(rc, root.getChildren(),
           totalPreemptionAllowed, root.idealAssigned);
       // compute recursively for lower levels and build list of leafs
-      for(TempQueue t : root.getChildren()) {
-        leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
+      for(TempQueuePerPartition t : root.getChildren()) {
+        leafQueueNames.addAll(recursivelyComputeIdealAssignment(t,
+            totalPreemptionAllowed));
       }
     } else {
       // we are in a leaf nothing to do, just return yourself
-      return Collections.singletonList(root);
+      return ImmutableSet.of(root.queueName);
     }
-    return leafs;
+    return leafQueueNames;
   }
 
   /**
@@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param tot_guarant the amount of capacity assigned to this pool of queues
    */
   private void computeIdealResourceDistribution(ResourceCalculator rc,
-      List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
+      List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
+      Resource tot_guarant) {
 
     // qAlloc tracks currently active queues (will decrease progressively as
     // demand is met)
-    List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
+    List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues);
     // unassigned tracks how much resources are still to assign, initialized
     // with the total capacity for this set of queues
     Resource unassigned = Resources.clone(tot_guarant);
 
     // group queues based on whether they have non-zero guaranteed capacity
-    Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
-    Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+    Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>();
+    Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>();
 
-    for (TempQueue q : qAlloc) {
+    for (TempQueuePerPartition q : qAlloc) {
       if (Resources
           .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
         nonZeroGuarQueues.add(q);
@@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // based on ideal assignment computed above and current assignment we derive
     // how much preemption is required overall
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
-    for (TempQueue t:queues) {
+    for (TempQueuePerPartition t:queues) {
       if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
         Resources.addTo(totPreemptionNeeded,
             Resources.subtract(t.current, t.idealAssigned));
@@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
     // assign to each queue the amount of actual preemption based on local
     // information of ideal preemption and scaling factor
-    for (TempQueue t : queues) {
+    for (TempQueuePerPartition t : queues) {
       t.assignPreemption(scalingFactor, rc, tot_guarant);
     }
     if (LOG.isDebugEnabled()) {
       long time = clock.getTime();
-      for (TempQueue t : queues) {
+      for (TempQueuePerPartition t : queues) {
         LOG.debug(time + ": " + t);
       }
     }
@@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * distributed uniformly.
    */
   private void computeFixpointAllocation(ResourceCalculator rc,
-      Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, 
-      boolean ignoreGuarantee) {
+      Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
+      Resource unassigned, boolean ignoreGuarantee) {
     // Prior to assigning the unused resources, process each queue as follows:
     // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
     // Else idealAssigned = current;
@@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // idealAssigned >= current + pending), remove it from consideration.
     // Sort queues from most under-guaranteed to most over-guaranteed.
     TQComparator tqComparator = new TQComparator(rc, tot_guarant);
-    PriorityQueue<TempQueue> orderedByNeed =
-                                 new PriorityQueue<TempQueue>(10,tqComparator);
-    for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
-      TempQueue q = i.next();
+    PriorityQueue<TempQueuePerPartition> orderedByNeed =
+        new PriorityQueue<TempQueuePerPartition>(10, tqComparator);
+    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+      TempQueuePerPartition q = i.next();
       if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
         q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
       } else {
@@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       // place it back in the ordered list of queues, recalculating its place
       // in the order of most under-guaranteed to most over-guaranteed. In this
       // way, the most underserved queue(s) are always given resources first.
-      Collection<TempQueue> underserved =
+      Collection<TempQueuePerPartition> underserved =
           getMostUnderservedQueues(orderedByNeed, tqComparator);
-      for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
-        TempQueue sub = i.next();
+      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+          .hasNext();) {
+        TempQueuePerPartition sub = i.next();
         Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
             unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
         Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
@@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   // Take the most underserved TempQueue (the one on the head). Collect and
   // return the list of all queues that have the same idealAssigned
   // percentage of guaranteed.
-  protected Collection<TempQueue> getMostUnderservedQueues(
-      PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
-    ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
+  protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
+      PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+    ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>();
     while (!orderedByNeed.isEmpty()) {
-      TempQueue q1 = orderedByNeed.remove();
+      TempQueuePerPartition q1 = orderedByNeed.remove();
       underserved.add(q1);
-      TempQueue q2 = orderedByNeed.peek();
+      TempQueuePerPartition q2 = orderedByNeed.peek();
       // q1's pct of guaranteed won't be larger than q2's. If it's less, then
       // return what has already been collected. Otherwise, q1's pct of
       // guaranteed == that of q2, so add q2 to underserved list during the
@@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param queues the list of queues to consider
    */
   private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      Collection<TempQueue> queues, boolean ignoreGuar) {
+      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
     
     if (ignoreGuar) {
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         q.normalizedGuarantee = (float)  1.0f / ((float) queues.size());
       }
     } else {
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         Resources.addTo(activeCap, q.guaranteed);
       }
-      for (TempQueue q : queues) {
+      for (TempQueuePerPartition q : queues) {
         q.normalizedGuarantee = Resources.divide(rc, clusterResource,
             q.guaranteed, activeCap);
       }
     }
   }
 
+  private String getPartitionByNodeId(NodeId nodeId) {
+    return scheduler.getSchedulerNode(nodeId).getPartition();
+  }
+
+  /**
+   * Return should we preempt rmContainer. If we should, deduct from
+   * <code>resourceToObtainByPartition</code>
+   */
+  private boolean tryPreemptContainerAndDeductResToObtain(
+      Map<String, Resource> resourceToObtainByPartitions,
+      RMContainer rmContainer, Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
+    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+    // We will not account resource of a container twice or more
+    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+      return false;
+    }
+
+    String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
+    Resource toObtainByPartition =
+        resourceToObtainByPartitions.get(nodePartition);
+
+    if (null != toObtainByPartition
+        && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+            Resources.none())) {
+      Resources.subtractFrom(toObtainByPartition,
+          rmContainer.getAllocatedResource());
+      // When we have no more resource need to obtain, remove from map.
+      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+          Resources.none())) {
+        resourceToObtainByPartitions.remove(nodePartition);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Marked container=" + rmContainer.getContainerId()
+            + " in partition=" + nodePartition + " will be preempted");
+      }
+      // Add to preemptMap
+      addToPreemptMap(preemptMap, attemptId, rmContainer);
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean preemptMapContains(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId attemptId, RMContainer rmContainer) {
+    Set<RMContainer> rmContainers;
+    if (null == (rmContainers = preemptMap.get(attemptId))) {
+      return false;
+    }
+    return rmContainers.contains(rmContainer);
+  }
+
+  private void addToPreemptMap(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+    Set<RMContainer> set;
+    if (null == (set = preemptMap.get(appAttemptId))) {
+      set = new HashSet<RMContainer>();
+      preemptMap.put(appAttemptId, set);
+    }
+    set.add(containerToPreempt);
+  }
+
   /**
    * Based a resource preemption target drop reservations of containers and
    * if necessary select containers for preemption from applications in each
@@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @return a map of applciationID to set of containers to preempt
    */
   private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
-      List<TempQueue> queues, Resource clusterResource) {
+      Set<String> leafQueueNames, Resource clusterResource) {
 
-    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
-        new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+    Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
+        new HashMap<ApplicationAttemptId, Set<RMContainer>>();
     List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
-    for (TempQueue qT : queues) {
-      if (qT.preemptionDisabled && qT.leafQueue != null) {
+    // Loop all leaf queues
+    for (String queueName : leafQueueNames) {
+      // check if preemption disabled for the queue
+      if (getQueueByPartition(queueName,
+          RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
         if (LOG.isDebugEnabled()) {
-          if (Resources.greaterThan(rc, clusterResource,
-              qT.toBePreempted, Resource.newInstance(0, 0))) {
-            LOG.debug("Tried to preempt the following "
-                      + "resources from non-preemptable queue: "
-                      + qT.queueName + " - Resources: " + qT.toBePreempted);
-          }
+          LOG.debug("skipping from queue=" + queueName
+              + " because it's a non-preemptable queue");
         }
         continue;
       }
-      // we act only if we are violating balance by more than
-      // maxIgnoredOverCapacity
-      if (Resources.greaterThan(rc, clusterResource, qT.current,
-          Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
-        // we introduce a dampening factor naturalTerminationFactor that
-        // accounts for natural termination of containers
-        Resource resToObtain =
-          Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
-        Resource skippedAMSize = Resource.newInstance(0, 0);
 
-        // lock the leafqueue while we scan applications and unreserve
-        synchronized (qT.leafQueue) {
-          Iterator<FiCaSchedulerApp> desc =   
-            qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
+      // compute resToObtainByPartition considered inter-queue preemption
+      LeafQueue leafQueue = null;
+
+      Map<String, Resource> resToObtainByPartition =
+          new HashMap<String, Resource>();
+      for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
+        leafQueue = qT.leafQueue;
+        // we act only if we are violating balance by more than
+        // maxIgnoredOverCapacity
+        if (Resources.greaterThan(rc, clusterResource, qT.current,
+            Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
+          // we introduce a dampening factor naturalTerminationFactor that
+          // accounts for natural termination of containers
+          Resource resToObtain =
+              Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+          // Only add resToObtain when it >= 0
+          if (Resources.greaterThan(rc, clusterResource, resToObtain,
+              Resources.none())) {
+            resToObtainByPartition.put(qT.partition, resToObtain);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queue=" + queueName + " partition=" + qT.partition
+                  + " resource-to-obtain=" + resToObtain);
+            }
+          }
           qT.actuallyPreempted = Resources.clone(resToObtain);
-          while (desc.hasNext()) {
-            FiCaSchedulerApp fc = desc.next();
-            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
-                Resources.none())) {
-              break;
+        } else {
+          qT.actuallyPreempted = Resources.none();
+        }
+      }
+
+      synchronized (leafQueue) {
+        // go through all ignore-partition-exclusivity containers first to make
+        // sure such containers will be preempted first
+        Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
+            leafQueue.getIgnoreExclusivityRMContainers();
+        for (String partition : resToObtainByPartition.keySet()) {
+          if (ignorePartitionExclusivityContainers.containsKey(partition)) {
+            TreeSet<RMContainer> rmContainers =
+                ignorePartitionExclusivityContainers.get(partition);
+            // We will check container from reverse order, so latter submitted
+            // application's containers will be preempted first.
+            for (RMContainer c : rmContainers.descendingSet()) {
+              boolean preempted =
+                  tryPreemptContainerAndDeductResToObtain(
+                      resToObtainByPartition, c, clusterResource, preemptMap);
+              if (!preempted) {
+                break;
+              }
             }
-            preemptMap.put(
-                fc.getApplicationAttemptId(),
-                preemptFrom(fc, clusterResource, resToObtain,
-                    skippedAMContainerlist, skippedAMSize));
           }
-          Resource maxAMCapacityForThisQueue = Resources.multiply(
-              Resources.multiply(clusterResource,
-                  qT.leafQueue.getAbsoluteCapacity()),
-              qT.leafQueue.getMaxAMResourcePerQueuePercent());
-
-          // Can try preempting AMContainers (still saving atmost
-          // maxAMCapacityForThisQueue AMResource's) if more resources are
-          // required to be preempted from this Queue.
-          preemptAMContainers(clusterResource, preemptMap,
-              skippedAMContainerlist, resToObtain, skippedAMSize,
-              maxAMCapacityForThisQueue);
         }
+
+        // preempt other containers
+        Resource skippedAMSize = Resource.newInstance(0, 0);
+        Iterator<FiCaSchedulerApp> desc =
+            leafQueue.getOrderingPolicy().getPreemptionIterator();
+        while (desc.hasNext()) {
+          FiCaSchedulerApp fc = desc.next();
+          // When we complete preempt from one partition, we will remove from
+          // resToObtainByPartition, so when it becomes empty, we can get no
+          // more preemption is needed
+          if (resToObtainByPartition.isEmpty()) {
+            break;
+          }
+
+          preemptFrom(fc, clusterResource, resToObtainByPartition,
+              skippedAMContainerlist, skippedAMSize, preemptMap);
+        }
+
+        // Can try preempting AMContainers (still saving atmost
+        // maxAMCapacityForThisQueue AMResource's) if more resources are
+        // required to be preempted from this Queue.
+        Resource maxAMCapacityForThisQueue = Resources.multiply(
+            Resources.multiply(clusterResource,
+                leafQueue.getAbsoluteCapacity()),
+            leafQueue.getMaxAMResourcePerQueuePercent());
+
+        preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+            resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue);
       }
     }
+
     return preemptMap;
   }
 
@@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    */
   private void preemptAMContainers(Resource clusterResource,
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      List<RMContainer> skippedAMContainerlist, Resource resToObtain,
-      Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+      List<RMContainer> skippedAMContainerlist,
+      Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
+      Resource maxAMCapacityForThisQueue) {
     for (RMContainer c : skippedAMContainerlist) {
       // Got required amount of resources for preemption, can stop now
-      if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
-          Resources.none())) {
+      if (resToObtainByPartition.isEmpty()) {
         break;
       }
       // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
-      // container selection iteration for preemption will be stopped. 
+      // container selection iteration for preemption will be stopped.
       if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
           maxAMCapacityForThisQueue)) {
         break;
       }
-      Set<RMContainer> contToPrempt = preemptMap.get(c
-          .getApplicationAttemptId());
-      if (null == contToPrempt) {
-        contToPrempt = new HashSet<RMContainer>();
-        preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+
+      boolean preempted =
+          tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+              clusterResource, preemptMap);
+      if (preempted) {
+        Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
       }
-      contToPrempt.add(c);
-      
-      Resources.subtractFrom(resToObtain, c.getContainer().getResource());
-      Resources.subtractFrom(skippedAMSize, c.getContainer()
-          .getResource());
     }
     skippedAMContainerlist.clear();
   }
@@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   /**
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
-   *
-   * @param app
-   * @param clusterResource
-   * @param rsrcPreempt
-   * @return Set<RMContainer> Set of RMContainers
    */
-  private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
-      Resource clusterResource, Resource rsrcPreempt,
-      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
-    Set<RMContainer> ret = new HashSet<RMContainer>();
+  private void preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Map<String, Resource> resToObtainByPartition,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
     ApplicationAttemptId appId = app.getApplicationAttemptId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking at application=" + app.getApplicationAttemptId()
+          + " resourceToObtain=" + resToObtainByPartition);
+    }
 
     // first drop reserved containers towards rsrcPreempt
-    List<RMContainer> reservations =
+    List<RMContainer> reservedContainers =
         new ArrayList<RMContainer>(app.getReservedContainers());
-    for (RMContainer c : reservations) {
-      if (Resources.lessThanOrEqual(rc, clusterResource,
-          rsrcPreempt, Resources.none())) {
-        return ret;
+    for (RMContainer c : reservedContainers) {
+      if (resToObtainByPartition.isEmpty()) {
+        return;
       }
+
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, preemptMap);
+
       if (!observeOnly) {
         dispatcher.handle(new ContainerPreemptEvent(appId, c,
             ContainerPreemptEventType.DROP_RESERVATION));
       }
-      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }
 
     // if more resources are to be freed go through all live containers in
     // reverse priority and reverse allocation order and mark them for
     // preemption
-    List<RMContainer> containers =
+    List<RMContainer> liveContainers =
       new ArrayList<RMContainer>(app.getLiveContainers());
 
-    sortContainers(containers);
+    sortContainers(liveContainers);
 
-    for (RMContainer c : containers) {
-      if (Resources.lessThanOrEqual(rc, clusterResource,
-            rsrcPreempt, Resources.none())) {
-        return ret;
+    for (RMContainer c : liveContainers) {
+      if (resToObtainByPartition.isEmpty()) {
+        return;
       }
+
       // Skip AM Container from preemption for now.
       if (c.isAMContainer()) {
         skippedAMContainerlist.add(c);
-        Resources.addTo(skippedAMSize, c.getContainer().getResource());
-        continue;
-      }
-      // skip Labeled resource
-      if(isLabeledContainer(c)){
+        Resources.addTo(skippedAMSize, c.getAllocatedResource());
         continue;
       }
-      ret.add(c);
-      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
-    }
 
-    return ret;
-  }
-  
-  /**
-   * Checking if given container is a labeled container
-   * 
-   * @param c
-   * @return true/false
-   */
-  private boolean isLabeledContainer(RMContainer c) {
-    return labels.containsKey(c.getAllocatedNode());
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, preemptMap);
+    }
   }
 
   /**
@@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * the leaves. Finally it aggregates pending resources in each queue and rolls
    * it up to higher levels.
    *
-   * @param root the root of the CapacityScheduler queue hierarchy
-   * @param clusterResources the total amount of resources in the cluster
+   * @param curQueue current queue which I'm looking at now
+   * @param partitionResource the total amount of resources in the cluster
    * @return the root of the cloned queue hierarchy
    */
-  private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
-    TempQueue ret;
-    synchronized (root) {
-      String queueName = root.getQueueName();
-      float absUsed = root.getAbsoluteUsedCapacity();
-      float absCap = root.getAbsoluteCapacity();
-      float absMaxCap = root.getAbsoluteMaximumCapacity();
-      boolean preemptionDisabled = root.getPreemptionDisabled();
-
-      Resource current = Resources.multiply(clusterResources, absUsed);
-      Resource guaranteed = Resources.multiply(clusterResources, absCap);
-      Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
+  private TempQueuePerPartition cloneQueues(CSQueue curQueue,
+      Resource partitionResource, String partitionToLookAt) {
+    TempQueuePerPartition ret;
+    synchronized (curQueue) {
+      String queueName = curQueue.getQueueName();
+      QueueCapacities qc = curQueue.getQueueCapacities();
+      float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
+      float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
+      float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
+      boolean preemptionDisabled = curQueue.getPreemptionDisabled();
+
+      Resource current = Resources.multiply(partitionResource, absUsed);
+      Resource guaranteed = Resources.multiply(partitionResource, absCap);
+      Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
+
+      // when partition is a non-exclusive partition, the actual maxCapacity
+      // could more than specified maxCapacity
+      try {
+        if (!scheduler.getRMContext().getNodeLabelManager()
+            .isExclusiveNodeLabel(partitionToLookAt)) {
+          maxCapacity =
+              Resources.max(rc, partitionResource, maxCapacity, current);
+        }
+      } catch (IOException e) {
+        // This may cause by partition removed when running capacity monitor,
+        // just ignore the error, this will be corrected when doing next check.
+      }
 
       Resource extra = Resource.newInstance(0, 0);
-      if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
+      if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
         extra = Resources.subtract(current, guaranteed);
       }
-      if (root instanceof LeafQueue) {
-        LeafQueue l = (LeafQueue) root;
-        Resource pending = l.getTotalResourcePending();
-        ret = new TempQueue(queueName, current, pending, guaranteed,
-            maxCapacity, preemptionDisabled);
+      if (curQueue instanceof LeafQueue) {
+        LeafQueue l = (LeafQueue) curQueue;
+        Resource pending =
+            l.getQueueResourceUsage().getPending(partitionToLookAt);
+        ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
+            maxCapacity, preemptionDisabled, partitionToLookAt);
         if (preemptionDisabled) {
           ret.untouchableExtra = extra;
         } else {
@@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         ret.setLeafQueue(l);
       } else {
         Resource pending = Resource.newInstance(0, 0);
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
-            maxCapacity, false);
+        ret =
+            new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
+                guaranteed, maxCapacity, false, partitionToLookAt);
         Resource childrensPreemptable = Resource.newInstance(0, 0);
-        for (CSQueue c : root.getChildQueues()) {
-          TempQueue subq = cloneQueues(c, clusterResources);
+        for (CSQueue c : curQueue.getChildQueues()) {
+          TempQueuePerPartition subq =
+              cloneQueues(c, partitionResource, partitionToLookAt);
           Resources.addTo(childrensPreemptable, subq.preemptableExtra);
           ret.addChild(subq);
         }
         // untouchableExtra = max(extra - childrenPreemptable, 0)
         if (Resources.greaterThanOrEqual(
-              rc, clusterResources, childrensPreemptable, extra)) {
+              rc, partitionResource, childrensPreemptable, extra)) {
           ret.untouchableExtra = Resource.newInstance(0, 0);
         } else {
           ret.untouchableExtra =
@@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         }
       }
     }
+    addTempQueuePartition(ret);
     return ret;
   }
 
   // simple printout function that reports internal queue state (useful for
   // plotting)
-  private void logToCSV(List<TempQueue> unorderedqueues){
-    List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
-    Collections.sort(queues, new Comparator<TempQueue>(){
-      @Override
-      public int compare(TempQueue o1, TempQueue o2) {
-        return o1.queueName.compareTo(o2.queueName);
-      }});
+  private void logToCSV(List<String> leafQueueNames){
+    Collections.sort(leafQueueNames);
     String queueState = " QUEUESTATE: " + clock.getTime();
     StringBuilder sb = new StringBuilder();
     sb.append(queueState);
-    for (TempQueue tq : queues) {
+
+    for (String queueName : leafQueueNames) {
+      TempQueuePerPartition tq =
+          getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL);
       sb.append(", ");
       tq.appendLogString(sb);
     }
     LOG.debug(sb.toString());
   }
 
+  private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
+    String queueName = queuePartition.queueName;
+
+    Map<String, TempQueuePerPartition> queuePartitions;
+    if (null == (queuePartitions = queueToPartitions.get(queueName))) {
+      queuePartitions = new HashMap<String, TempQueuePerPartition>();
+      queueToPartitions.put(queueName, queuePartitions);
+    }
+    queuePartitions.put(queuePartition.partition, queuePartition);
+  }
+
+  /**
+   * Get queue partition by given queueName and partitionName
+   */
+  private TempQueuePerPartition getQueueByPartition(String queueName,
+      String partition) {
+    Map<String, TempQueuePerPartition> partitionToQueues = null;
+    if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
+      return null;
+    }
+    return partitionToQueues.get(partition);
+  }
+
+  /**
+   * Get all queue partitions by given queueName
+   */
+  private Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
+    if (!queueToPartitions.containsKey(queueName)) {
+      return null;
+    }
+    return queueToPartitions.get(queueName).values();
+  }
+
   /**
    * Temporary data-structure tracking resource availability, pending resource
-   * need, current utilization. Used to clone {@link CSQueue}.
+   * need, current utilization. This is per-queue-per-partition data structure
    */
-  static class TempQueue {
+  static class TempQueuePerPartition {
     final String queueName;
     final Resource current;
     final Resource pending;
     final Resource guaranteed;
     final Resource maxCapacity;
+    final String partition;
     Resource idealAssigned;
     Resource toBePreempted;
+    // For logging purpose
     Resource actuallyPreempted;
     Resource untouchableExtra;
     Resource preemptableExtra;
 
     double normalizedGuarantee;
 
-    final ArrayList<TempQueue> children;
+    final ArrayList<TempQueuePerPartition> children;
     LeafQueue leafQueue;
     boolean preemptionDisabled;
 
-    TempQueue(String queueName, Resource current, Resource pending,
-        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
+    TempQueuePerPartition(String queueName, Resource current, Resource pending,
+        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
+        String partition) {
       this.queueName = queueName;
       this.current = current;
       this.pending = pending;
@@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       this.actuallyPreempted = Resource.newInstance(0, 0);
       this.toBePreempted = Resource.newInstance(0, 0);
       this.normalizedGuarantee = Float.NaN;
-      this.children = new ArrayList<TempQueue>();
+      this.children = new ArrayList<TempQueuePerPartition>();
       this.untouchableExtra = Resource.newInstance(0, 0);
       this.preemptableExtra = Resource.newInstance(0, 0);
       this.preemptionDisabled = preemptionDisabled;
+      this.partition = partition;
     }
 
     public void setLeafQueue(LeafQueue l){
@@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
      * When adding a child we also aggregate its pending resource needs.
      * @param q the child queue to add to this queue
      */
-    public void addChild(TempQueue q) {
+    public void addChild(TempQueuePerPartition q) {
       assert leafQueue == null;
       children.add(q);
       Resources.addTo(pending, q.pending);
     }
 
-    public void addChildren(ArrayList<TempQueue> queues) {
+    public void addChildren(ArrayList<TempQueuePerPartition> queues) {
       assert leafQueue == null;
       children.addAll(queues);
     }
 
 
-    public ArrayList<TempQueue> getChildren(){
+    public ArrayList<TempQueuePerPartition> getChildren(){
       return children;
     }
 
@@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
     public void printAll() {
       LOG.info(this.toString());
-      for (TempQueue sub : this.getChildren()) {
+      for (TempQueuePerPartition sub : this.getChildren()) {
         sub.printAll();
       }
     }
@@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
   }
 
-  static class TQComparator implements Comparator<TempQueue> {
+  static class TQComparator implements Comparator<TempQueuePerPartition> {
     private ResourceCalculator rc;
     private Resource clusterRes;
 
@@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     }
 
     @Override
-    public int compare(TempQueue tq1, TempQueue tq2) {
+    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
       if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
         return -1;
       }
@@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // Calculates idealAssigned / guaranteed
     // TempQueues with 0 guarantees are always considered the most over
     // capacity and therefore considered last for resources.
-    private double getIdealPctOfGuaranteed(TempQueue q) {
+    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
       double pctOver = Integer.MAX_VALUE;
       if (q != null && Resources.greaterThan(
           rc, clusterRes, q.guaranteed, Resources.none())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2750d4e..316a450 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class RMContainerImpl implements RMContainer {
+public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
   private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
 
@@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer {
     }
     return nodeLabelExpression;
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof RMContainer) {
+      if (null != getContainerId()) {
+        return getContainerId().equals(((RMContainer) obj).getContainerId());
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    if (null != getContainerId()) {
+      return getContainerId().hashCode();
+    }
+    return super.hashCode();
+  }
+
+  @Override
+  public int compareTo(RMContainer o) {
+    if (containerId != null && o.getContainerId() != null) {
+      return containerId.compareTo(o.getContainerId());
+    }
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1e1623d..48c7f2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -153,7 +153,7 @@ public class CapacityScheduler extends
   static final PartitionedQueueComparator partitionedQueueComparator =
       new PartitionedQueueComparator();
 
-  static final Comparator<FiCaSchedulerApp> applicationComparator = 
+  public static final Comparator<FiCaSchedulerApp> applicationComparator =
     new Comparator<FiCaSchedulerApp>() {
     @Override
     public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 22aafaa..56ade84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue {
   
   private final QueueResourceLimitsInfo queueResourceLimitsInfo =
       new QueueResourceLimitsInfo();
-  
+
   private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
 
   private OrderingPolicy<FiCaSchedulerApp> 
     orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+
+  // record all ignore partition exclusivityRMContainer, this will be used to do
+  // preemption, key is the partition of the RMContainer allocated on
+  private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
+      new HashMap<>();
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue {
           Resource assigned = assignment.getResource();
           if (Resources.greaterThan(
               resourceCalculator, clusterResource, assigned, Resources.none())) {
+            // Get reserved or allocated container from application
+            RMContainer reservedOrAllocatedRMContainer =
+                application.getRMContainer(assignment
+                    .getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId());
 
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                node.getPartition());
+                node.getPartition(), reservedOrAllocatedRMContainer);
             
             // Don't reset scheduling opportunities for offswitch assignments
             // otherwise the app will be delayed for each non-local assignment.
@@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue {
           orderingPolicy.containerReleased(application, rmContainer);
           
           releaseResource(clusterResource, application,
-              container.getResource(), node.getPartition());
+              container.getResource(), node.getPartition(), rmContainer);
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue {
 
   synchronized void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
-      String nodePartition) {
+      String nodePartition, RMContainer rmContainer) {
     super.allocateResource(clusterResource, resource, nodePartition);
     
+    // handle ignore exclusivity container
+    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+        RMNodeLabelsManager.NO_LABEL)
+        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      TreeSet<RMContainer> rmContainers = null;
+      if (null == (rmContainers =
+          ignorePartitionExclusivityRMContainers.get(nodePartition))) {
+        rmContainers = new TreeSet<>();
+        ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
+      }
+      rmContainers.add(rmContainer);
+    }
+
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
@@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  synchronized void releaseResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource, String nodePartition) {
+  synchronized void releaseResource(Resource clusterResource,
+      FiCaSchedulerApp application, Resource resource, String nodePartition,
+      RMContainer rmContainer) {
     super.releaseResource(clusterResource, resource, nodePartition);
     
+    // handle ignore exclusivity container
+    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+        RMNodeLabelsManager.NO_LABEL)
+        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
+        Set<RMContainer> rmContainers =
+            ignorePartitionExclusivityRMContainers.get(nodePartition);
+        rmContainers.remove(rmContainer);
+        if (rmContainers.isEmpty()) {
+          ignorePartitionExclusivityRMContainers.remove(nodePartition);
+        }
+      }
+    }
+
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
@@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition());
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
   
+  /**
+   * return all ignored partition exclusivity RMContainers in the LeafQueue, this
+   * will be used by preemption policy, and use of return
+   * ignorePartitionExclusivityRMContainer should protected by LeafQueue
+   * synchronized lock
+   */
+  public synchronized Map<String, TreeSet<RMContainer>>
+      getIgnoreExclusivityRMContainers() {
+    return ignorePartitionExclusivityRMContainers;
+  }
+
   public void setCapacity(float capacity) {
     queueCapacities.setCapacity(capacity);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index c5c067d..5158255 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class AssignmentInformation {
@@ -117,4 +118,24 @@ public class AssignmentInformation {
   public List<AssignmentDetails> getReservationDetails() {
     return operationDetails.get(Operation.RESERVATION);
   }
+
+  private ContainerId getFirstContainerIdFromOperation(Operation op) {
+    if (null != operationDetails.get(Operation.ALLOCATION)) {
+      List<AssignmentDetails> assignDetails =
+          operationDetails.get(Operation.ALLOCATION);
+      if (!assignDetails.isEmpty()) {
+        return assignDetails.get(0).containerId;
+      }
+    }
+    return null;
+  }
+
+  public ContainerId getFirstAllocatedOrReservedContainerId() {
+    ContainerId containerId = null;
+    containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
+    if (null != containerId) {
+      return containerId;
+    }
+    return getFirstContainerIdFromOperation(Operation.RESERVATION);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 9e8b769..6c0ed6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
@@ -37,27 +38,17 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer; 
 
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeSet;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
-import org.mortbay.log.Log;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestProportionalCapacityPreemptionPolicy {
 
@@ -799,50 +794,6 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @Test
-  public void testIdealAllocationForLabels() {
-    int[][] qData = new int[][] {
-    // / A B
-        { 80, 40, 40 }, // abs
-        { 80, 80, 80 }, // maxcap
-        { 80, 80, 0 }, // used
-        { 70, 20, 50 }, // pending
-        { 0, 0, 0 }, // reserved
-        { 5, 4, 1 }, // apps
-        { -1, 1, 1 }, // req granularity
-        { 2, 0, 0 }, // subqueues
-    };
-    setAMContainer = true;
-    setLabeledContainer = true;
-    Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
-    NodeId node = NodeId.newInstance("node1", 0);
-    Set<String> labelSet = new HashSet<String>();
-    labelSet.add("x");
-    labels.put(node, labelSet);
-    when(lm.getNodeLabels()).thenReturn(labels);
-    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
-    // Subtracting Label X resources from cluster resources
-    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
-        Resources.clone(Resource.newInstance(80, 0)));
-    clusterResources.setMemory(100);
-    policy.editSchedule();
-
-    // By skipping AM Container and Labeled container, all other 18 containers
-    // of appD will be
-    // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
-
-    // By skipping AM Container and Labeled container, all other 18 containers
-    // of appC will be
-    // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
-
-    // rest 4 containers from appB will be preempted
-    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
-    setAMContainer = false;
-    setLabeledContainer = false;
-  }
-
-  @Test
   public void testPreemptSkippedAMContainers() {
     int[][] qData = new int[][] {
         //  /   A   B
@@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy {
     clusterResources =
       Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResource()).thenReturn(clusterResources);
+    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+        clusterResources);
+
+    SchedulerNode mNode = mock(SchedulerNode.class);
+    when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
+    when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
     return policy;
   }
 
@@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     float tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
-    when(root.getQueueName()).thenReturn("/");
+    when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
     when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
     when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
-    when(root.getQueuePath()).thenReturn("root");
+    QueueCapacities rootQc = new QueueCapacities(true);
+    rootQc.setAbsoluteUsedCapacity(used[0] / tot);
+    rootQc.setAbsoluteCapacity(abs[0] / tot);
+    rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
+    when(root.getQueueCapacities()).thenReturn(rootQc);
+    when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
     boolean preemptionDisabled = mockPreemptionStatus("root");
     when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
 
@@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy {
       when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
       when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
       when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+
+      // We need to make these fields to QueueCapacities
+      QueueCapacities qc = new QueueCapacities(false);
+      qc.setAbsoluteUsedCapacity(used[i] / tot);
+      qc.setAbsoluteCapacity(abs[i] / tot);
+      qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
+      when(q.getQueueCapacities()).thenReturn(qc);
+
       String parentPathName = p.getQueuePath();
       parentPathName = (parentPathName == null) ? "root" : parentPathName;
       String queuePathName = (parentPathName+"."+queueName).replace("/","root");
@@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     return pq;
   }
 
+  @SuppressWarnings("rawtypes")
   LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
       int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
@@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy {
         new ArrayList<ApplicationAttemptId>();
     when(lq.getTotalResourcePending()).thenReturn(
         Resource.newInstance(pending[i], 0));
+    // need to set pending resource in resource usage as well
+    ResourceUsage ru = new ResourceUsage();
+    ru.setPending(Resource.newInstance(pending[i], 0));
+    when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
@@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
     when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+    when(mC.getAllocatedResource()).thenReturn(r);
     if (priority.AMCONTAINER.getValue() == cpriority) {
       when(mC.isAMContainer()).thenReturn(true);
     }


Mime
View raw message