hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [1/2] hadoop git commit: YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)
Date Mon, 31 Oct 2016 22:19:11 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 773c60bd7 -> 90dd3a814


http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
new file mode 100644
index 0000000..fccd2a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Temporary data-structure tracking resource availability, pending resource
+ * need, current utilization for an application.
+ */
+public class TempAppPerPartition extends AbstractPreemptionEntity {
+
+  // Following fields are settled and used by candidate selection policies
+  private final int priority;
+  private final ApplicationId applicationId;
+
+  FiCaSchedulerApp app;
+
+  TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition,
+      Resource amUsedPerPartition, Resource reserved,
+      Resource pendingPerPartition) {
+    super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved,
+        pendingPerPartition);
+
+    this.priority = app.getPriority().getPriority();
+    this.applicationId = app.getApplicationId();
+    this.app = app;
+  }
+
+  public FiCaSchedulerApp getFiCaSchedulerApp() {
+    return app;
+  }
+
+  public void assignPreemption(Resource killable) {
+    Resources.addTo(toBePreempted, killable);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+        .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+        .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" PREEMPT_OTHER: ")
+        .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+        .append(getActuallyToBePreempted()).append("\n");
+
+    return sb.toString();
+  }
+
+  void appendLogString(StringBuilder sb) {
+    sb.append(queueName).append(", ").append(getUsed().getMemorySize())
+        .append(", ").append(getUsed().getVirtualCores()).append(", ")
+        .append(pending.getMemorySize()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemorySize()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemorySize()).append(", ")
+        .append(toBePreempted.getVirtualCores()).append(", ")
+        .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+        .append(getActuallyToBePreempted().getVirtualCores());
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
+      Resource cluster, Resource toBeDeduct, String partition) {
+    if (Resources.greaterThan(resourceCalculator, cluster,
+        getActuallyToBePreempted(), toBeDeduct)) {
+      Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 04ed135..28099c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
+import java.util.Collection;
 
 /**
  * Temporary data-structure tracking resource availability, pending resource
  * need, current utilization. This is per-queue-per-partition data structure
  */
-public class TempQueuePerPartition {
+public class TempQueuePerPartition extends AbstractPreemptionEntity {
   // Following fields are copied from scheduler
-  final String queueName;
   final String partition;
-  final Resource pending;
 
-  private final Resource current;
   private final Resource killable;
-  private final Resource reserved;
   private final float absCapacity;
   private final float absMaxCapacity;
   final Resource totalPartitionResource;
 
-  // Following fields are setted and used by candidate selection policies
-  Resource idealAssigned;
-  Resource toBePreempted;
+  // Following fields are settled and used by candidate selection policies
   Resource untouchableExtra;
   Resource preemptableExtra;
-  private Resource actuallyToBePreempted;
 
   double normalizedGuarantee;
 
   final ArrayList<TempQueuePerPartition> children;
+  private Collection<TempAppPerPartition> apps;
   LeafQueue leafQueue;
   boolean preemptionDisabled;
 
@@ -60,8 +55,8 @@ public class TempQueuePerPartition {
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
       Resource reserved, CSQueue queue) {
-    this.queueName = queueName;
-    this.current = current;
+    super(queueName, current, Resource.newInstance(0, 0), reserved,
+        Resource.newInstance(0, 0));
 
     if (queue instanceof LeafQueue) {
       LeafQueue l = (LeafQueue) queue;
@@ -72,11 +67,9 @@ public class TempQueuePerPartition {
       pending = Resources.createResource(0);
     }
 
-    this.idealAssigned = Resource.newInstance(0, 0);
-    this.actuallyToBePreempted = Resource.newInstance(0, 0);
-    this.toBePreempted = Resource.newInstance(0, 0);
     this.normalizedGuarantee = Float.NaN;
     this.children = new ArrayList<>();
+    this.apps = new ArrayList<>();
     this.untouchableExtra = Resource.newInstance(0, 0);
     this.preemptableExtra = Resource.newInstance(0, 0);
     this.preemptionDisabled = preemptionDisabled;
@@ -85,7 +78,6 @@ public class TempQueuePerPartition {
     this.absCapacity = absCapacity;
     this.absMaxCapacity = absMaxCapacity;
     this.totalPartitionResource = totalPartitionResource;
-    this.reserved = reserved;
   }
 
   public void setLeafQueue(LeafQueue l) {
@@ -95,7 +87,9 @@ public class TempQueuePerPartition {
 
   /**
    * When adding a child we also aggregate its pending resource needs.
-   * @param q the child queue to add to this queue
+   *
+   * @param q
+   *          the child queue to add to this queue
    */
   public void addChild(TempQueuePerPartition q) {
     assert leafQueue == null;
@@ -103,14 +97,10 @@ public class TempQueuePerPartition {
     Resources.addTo(pending, q.pending);
   }
 
-  public ArrayList<TempQueuePerPartition> getChildren(){
+  public ArrayList<TempQueuePerPartition> getChildren() {
     return children;
   }
 
-  public Resource getUsed() {
-    return current;
-  }
-
   public Resource getUsedDeductReservd() {
     return Resources.subtract(current, reserved);
   }
@@ -122,28 +112,30 @@ public class TempQueuePerPartition {
     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
         Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
-    // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+    // remain = avail - min(avail, (max - assigned), (current + pending -
+    // assigned))
     Resource accepted = Resources.min(rc, clusterResource,
-        absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail,
-            Resources
-                /*
-                 * When we're using FifoPreemptionSelector
-                 * (considerReservedResource = false).
-                 *
-                 * We should deduct reserved resource to avoid excessive preemption:
-                 *
-                 * For example, if an under-utilized queue has used = reserved = 20.
-                 * Preemption policy will try to preempt 20 containers
-                 * (which is not satisfied) from different hosts.
-                 *
-                 * In FifoPreemptionSelector, there's no guarantee that preempted
-                 * resource can be used by pending request, so policy will preempt
-                 * resources repeatly.
-                 */
-                .subtract(Resources.add(
-                    (considersReservedResource ? getUsed() :
-                      getUsedDeductReservd()),
-                    pending), idealAssigned)));
+        absMaxCapIdealAssignedDelta,
+        Resources.min(rc, clusterResource, avail, Resources
+            /*
+             * When we're using FifoPreemptionSelector (considerReservedResource
+             * = false).
+             *
+             * We should deduct reserved resource to avoid excessive preemption:
+             *
+             * For example, if an under-utilized queue has used = reserved = 20.
+             * Preemption policy will try to preempt 20 containers (which is not
+             * satisfied) from different hosts.
+             *
+             * In FifoPreemptionSelector, there's no guarantee that preempted
+             * resource can be used by pending request, so policy will preempt
+             * resources repeatly.
+             */
+            .subtract(
+                Resources.add((considersReservedResource
+                    ? getUsed()
+                    : getUsedDeductReservd()), pending),
+                idealAssigned)));
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;
@@ -162,8 +154,7 @@ public class TempQueuePerPartition {
     untouchableExtra = Resources.none();
     preemptableExtra = Resources.none();
 
-    Resource extra = Resources.subtract(getUsed(),
-        getGuaranteed());
+    Resource extra = Resources.subtract(getUsed(), getGuaranteed());
     if (Resources.lessThan(rc, totalPartitionResource, extra,
         Resources.none())) {
       extra = Resources.none();
@@ -197,26 +188,21 @@ public class TempQueuePerPartition {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(" NAME: " + queueName)
-        .append(" CUR: ").append(current)
-        .append(" PEN: ").append(pending)
-        .append(" RESERVED: ").append(reserved)
-        .append(" GAR: ").append(getGuaranteed())
-        .append(" NORM: ").append(normalizedGuarantee)
-        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
-        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
-        .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
+    sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
+        .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
+        .append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
+        .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
         .append(" UNTOUCHABLE: ").append(untouchableExtra)
-        .append(" PREEMPTABLE: ").append(preemptableExtra)
-        .append("\n");
+        .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
 
     return sb.toString();
   }
 
   public void assignPreemption(float scalingFactor, ResourceCalculator rc,
       Resource clusterResource) {
-    Resource usedDeductKillable = Resources.subtract(
-        getUsed(), killable);
+    Resource usedDeductKillable = Resources.subtract(getUsed(), killable);
     Resource totalResource = Resources.add(getUsed(), pending);
 
     // The minimum resource that we need to keep for a queue is:
@@ -224,7 +210,8 @@ public class TempQueuePerPartition {
     //
     // Doing this because when we calculate ideal allocation doesn't consider
     // reserved resource, ideal-allocation calculated could be less than
-    // guaranteed and total. We should avoid preempt from a queue if it is already
+    // guaranteed and total. We should avoid preempt from a queue if it is
+    // already
     // <= its guaranteed resource.
     Resource minimumQueueResource = Resources.max(rc, clusterResource,
         Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
@@ -233,33 +220,26 @@ public class TempQueuePerPartition {
     if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
         minimumQueueResource)) {
       toBePreempted = Resources.multiply(
-          Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor);
+          Resources.subtract(usedDeductKillable, minimumQueueResource),
+          scalingFactor);
     } else {
       toBePreempted = Resources.none();
     }
   }
 
-  public Resource getActuallyToBePreempted() {
-    return actuallyToBePreempted;
-  }
-
-  public void setActuallyToBePreempted(Resource res) {
-    this.actuallyToBePreempted = res;
-  }
-
   public void deductActuallyToBePreempted(ResourceCalculator rc,
       Resource cluster, Resource toBeDeduct) {
-    if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) {
-      Resources.subtractFrom(actuallyToBePreempted, toBeDeduct);
+    if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(),
+        toBeDeduct)) {
+      Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
     }
-    actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted,
-        Resources.none());
+    setActuallyToBePreempted(Resources.max(rc, cluster,
+        getActuallyToBePreempted(), Resources.none()));
   }
 
   void appendLogString(StringBuilder sb) {
-    sb.append(queueName).append(", ")
-        .append(current.getMemorySize()).append(", ")
-        .append(current.getVirtualCores()).append(", ")
+    sb.append(queueName).append(", ").append(current.getMemorySize())
+        .append(", ").append(current.getVirtualCores()).append(", ")
         .append(pending.getMemorySize()).append(", ")
         .append(pending.getVirtualCores()).append(", ")
         .append(getGuaranteed().getMemorySize()).append(", ")
@@ -267,9 +247,17 @@ public class TempQueuePerPartition {
         .append(idealAssigned.getMemorySize()).append(", ")
         .append(idealAssigned.getVirtualCores()).append(", ")
         .append(toBePreempted.getMemorySize()).append(", ")
-        .append(toBePreempted.getVirtualCores() ).append(", ")
-        .append(actuallyToBePreempted.getMemorySize()).append(", ")
-        .append(actuallyToBePreempted.getVirtualCores());
+        .append(toBePreempted.getVirtualCores()).append(", ")
+        .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+        .append(getActuallyToBePreempted().getVirtualCores());
+  }
+
+  public void addAllApps(Collection<TempAppPerPartition> orderedApps) {
+    this.apps = orderedApps;
+  }
+
+  public Collection<TempAppPerPartition> getApps() {
+    return apps;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 6db5074..cea5aa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   private static final String PREEMPTION_CONFIG_PREFIX =
       "yarn.resourcemanager.monitor.capacity.preemption.";
 
+  private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
+      "intra-queue-preemption.";
+
   /** If true, run the policy but do not affect the cluster with preemption and
    * kill events. */
   public static final String PREEMPTION_OBSERVE_ONLY =
@@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
       PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
   public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
       false;
+
+  /**
+   * For intra-queue preemption, priority/user-limit/fairness based selectors
+   * can help to preempt containers.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_ENABLED =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
+  public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
+
+  /**
+   * For intra-queue preemption, consider those queues which are above used cap
+   * limit.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
+  public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+      0.5f;
+
+  /**
+   * For intra-queue preemption, allowable maximum-preemptable limit per queue.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
+  public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+      0.2f;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3c51961..214c6e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -519,6 +519,7 @@ public class LeafQueue extends AbstractCSQueue {
       // queue metrics are updated, more resource may be available
       // activate the pending applications if possible
       activateApplications();
+
     } finally {
       writeLock.unlock();
     }
@@ -1148,8 +1149,9 @@ public class LeafQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerApp application,
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-        computeUserLimit(application, clusterResource, user, partition,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
+        computeUserLimit(application.getUser(), clusterResource, user,
+            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+        partition);
   }
 
   private Resource getHeadroom(User user,
@@ -1221,7 +1223,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
     Resource userLimit =
-        computeUserLimit(application, clusterResource, queueUser,
+        computeUserLimit(application.getUser(), clusterResource, queueUser,
             nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
@@ -1259,7 +1261,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Lock(NoLock.class)
-  private Resource computeUserLimit(FiCaSchedulerApp application,
+  private Resource computeUserLimit(String userName,
       Resource clusterResource, User user,
       String nodePartition, SchedulingMode schedulingMode) {
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -1359,7 +1361,6 @@ public class LeafQueue extends AbstractCSQueue {
             minimumAllocation);
 
     if (LOG.isDebugEnabled()) {
-      String userName = application.getUser();
       LOG.debug("User limit computation for " + userName +
           " in queue " + getQueueName() +
           " userLimitPercent=" + userLimit +
@@ -2010,6 +2011,17 @@ public class LeafQueue extends AbstractCSQueue {
         .getSchedulableEntities());
   }
 
+  /**
+   * Obtain (read-only) collection of all applications.
+   */
+  public Collection<FiCaSchedulerApp> getAllApplications() {
+    Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
+        pendingOrderingPolicy.getSchedulableEntities());
+    apps.addAll(orderingPolicy.getSchedulableEntities());
+
+    return Collections.unmodifiableCollection(apps);
+  }
+
   // Consider the headroom for each user in the queue.
   // Total pending for the queue =
   //   sum(for each user(min((user's headroom), sum(user's pending requests))))
@@ -2026,7 +2038,7 @@ public class LeafQueue extends AbstractCSQueue {
         if (!userNameToHeadroom.containsKey(userName)) {
           User user = getUser(userName);
           Resource headroom = Resources.subtract(
-              computeUserLimit(app, resources, user, partition,
+              computeUserLimit(app.getUser(), resources, user, partition,
                   SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
               user.getUsed(partition));
           // Make sure headroom is not negative.
@@ -2048,6 +2060,16 @@ public class LeafQueue extends AbstractCSQueue {
 
   }
 
+  public synchronized Resource getUserLimitPerUser(String userName,
+      Resource resources, String partition) {
+
+    // Check user resource limit
+    User user = getUser(userName);
+
+    return computeUserLimit(userName, resources, user, partition,
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+  }
+
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
@@ -2103,8 +2125,8 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   /**
-   * return all ignored partition exclusivity RMContainers in the LeafQueue, this
-   * will be used by preemption policy.
+   * @return all ignored partition exclusivity RMContainers in the LeafQueue,
+   *         this will be used by preemption policy.
    */
   public Map<String, TreeSet<RMContainer>>
       getIgnoreExclusivityRMContainers() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index aa7ad50..ebe70d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -312,6 +313,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
+  public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() {
+
+    Map<String, Resource> ret = new HashMap<String, Resource>();
+    Resource res = null;
+    for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) {
+      ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*");
+      if ((res = ret.get(rr.getNodeLabelExpression())) == null) {
+        res = Resources.createResource(0, 0);
+        ret.put(rr.getNodeLabelExpression(), res);
+      }
+
+      Resources.addTo(res,
+          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+    }
+    return ret;
+  }
+
   public void markContainerForPreemption(ContainerId cont) {
     try {
       writeLock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index 3d3f1ea..5b8425b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -65,11 +65,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
@@ -164,13 +167,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         mClock);
   }
 
-  private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
-      String queueName, List<RMContainer> reservedContainers,
-      List<RMContainer> liveContainers) {
+  private void mockContainers(String containersConfig, FiCaSchedulerApp app,
+      ApplicationAttemptId attemptId, String queueName,
+      List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
     int containerId = 1;
     int start = containersConfig.indexOf("=") + 1;
     int end = -1;
 
+    Resource used = Resource.newInstance(0, 0);
+    Resource pending = Resource.newInstance(0, 0);
+    Priority pri = Priority.newInstance(0);
+
     while (start < containersConfig.length()) {
       while (start < containersConfig.length()
           && containersConfig.charAt(start) != '(') {
@@ -192,43 +199,52 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
 
       // now we found start/end, get container values
       String[] values = containersConfig.substring(start + 1, end).split(",");
-      if (values.length != 6) {
+      if (values.length < 6 || values.length > 8) {
         throw new IllegalArgumentException("Format to define container is:"
-            + "(priority,resource,host,expression,repeat,reserved)");
+            + "(priority,resource,host,expression,repeat,reserved, pending)");
       }
-      Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+      pri.setPriority(Integer.valueOf(values[0]));
       Resource res = parseResourceFromString(values[1]);
       NodeId host = NodeId.newInstance(values[2], 1);
-      String exp = values[3];
+      String label = values[3];
+      String userName = "user";
       int repeat = Integer.valueOf(values[4]);
       boolean reserved = Boolean.valueOf(values[5]);
+      if (values.length >= 7) {
+        Resources.addTo(pending, parseResourceFromString(values[6]));
+      }
+      if (values.length == 8) {
+        userName = values[7];
+      }
 
       for (int i = 0; i < repeat; i++) {
         Container c = mock(Container.class);
+        Resources.addTo(used, res);
         when(c.getResource()).thenReturn(res);
         when(c.getPriority()).thenReturn(pri);
         SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
         RMContainerImpl rmc = mock(RMContainerImpl.class);
         when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
         when(rmc.getAllocatedNode()).thenReturn(host);
-        when(rmc.getNodeLabelExpression()).thenReturn(exp);
+        when(rmc.getNodeLabelExpression()).thenReturn(label);
         when(rmc.getAllocatedResource()).thenReturn(res);
         when(rmc.getContainer()).thenReturn(c);
         when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
         when(rmc.getQueueName()).thenReturn(queueName);
-        final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
-        when(rmc.getContainerId()).thenReturn(
-            cId);
+        final ContainerId cId = ContainerId.newContainerId(attemptId,
+            containerId);
+        when(rmc.getContainerId()).thenReturn(cId);
         doAnswer(new Answer<Integer>() {
           @Override
           public Integer answer(InvocationOnMock invocation) throws Throwable {
-            return cId.compareTo(((RMContainer) invocation.getArguments()[0])
-                .getContainerId());
+            return cId.compareTo(
+                ((RMContainer) invocation.getArguments()[0]).getContainerId());
           }
         }).when(rmc).compareTo(any(RMContainer.class));
 
         if (containerId == 1) {
           when(rmc.isAMContainer()).thenReturn(true);
+          when(app.getAMResource(label)).thenReturn(res);
         }
 
         if (reserved) {
@@ -243,25 +259,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
 
         // If this is a non-exclusive allocation
         String partition = null;
-        if (exp.isEmpty()
+        if (label.isEmpty()
             && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
-            .isEmpty()) {
+                .isEmpty()) {
           LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
-          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
-              queue.getIgnoreExclusivityRMContainers();
+          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
+              .getIgnoreExclusivityRMContainers();
           if (!ignoreExclusivityContainers.containsKey(partition)) {
             ignoreExclusivityContainers.put(partition,
                 new TreeSet<RMContainer>());
           }
           ignoreExclusivityContainers.get(partition).add(rmc);
         }
-        LOG.debug("add container to app=" + attemptId + " res=" + res
-            + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+        LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
+            + host + " nodeLabelExpression=" + label + " partition="
             + partition);
 
         containerId++;
       }
 
+      // Some more app specific aggregated data can be better filled here.
+      when(app.getPriority()).thenReturn(pri);
+      when(app.getUser()).thenReturn(userName);
+      when(app.getCurrentConsumption()).thenReturn(used);
+      when(app.getCurrentReservation())
+          .thenReturn(Resources.createResource(0, 0));
+
+      Map<String, Resource> pendingForDefaultPartition =
+          new HashMap<String, Resource>();
+      // Add for default partition for now.
+      pendingForDefaultPartition.put(label, pending);
+      when(app.getTotalPendingRequestsPerPartition())
+          .thenReturn(pendingForDefaultPartition);
+
+      // need to set pending resource in resource usage as well
+      ResourceUsage ru = new ResourceUsage();
+      ru.setUsed(label, used);
+      when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+
       start = end + 1;
     }
   }
@@ -277,6 +312,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
    */
   private void mockApplications(String appsConfig) {
     int id = 1;
+    HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
+    LeafQueue queue = null;
     for (String a : appsConfig.split(";")) {
       String[] strs = a.split("\t");
       String queueName = strs[0];
@@ -285,24 +322,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       List<RMContainer> liveContainers = new ArrayList<RMContainer>();
       List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
       ApplicationId appId = ApplicationId.newInstance(0L, id);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId
+          .newInstance(appId, 1);
 
-      mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+      when(app.getAMResource(anyString()))
+          .thenReturn(Resources.createResource(0, 0));
+      mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
           liveContainers);
+      LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
 
-      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
       when(app.getLiveContainers()).thenReturn(liveContainers);
       when(app.getReservedContainers()).thenReturn(reservedContainers);
       when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(app.getApplicationId()).thenReturn(appId);
-      when(app.getPriority()).thenReturn(Priority.newInstance(0));
 
       // add to LeafQueue
-      LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
       queue.getApplications().add(app);
+      queue.getAllApplications().add(app);
 
+      HashSet<String> users = userMap.get(queueName);
+      if (null == users) {
+        users = new HashSet<String>();
+        userMap.put(queueName, users);
+      }
+
+      users.add(app.getUser());
       id++;
     }
+
+    for (String queueName : userMap.keySet()) {
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
+      // Currently we have user-limit test support only for default label.
+      Resource totResoucePerPartition = partitionToResource.get("");
+      Resource capacity = Resources.multiply(totResoucePerPartition,
+          queue.getQueueCapacities().getAbsoluteCapacity());
+      HashSet<String> users = userMap.get(queue.getQueueName());
+      Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+      for (String user : users) {
+        when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+            anyString())).thenReturn(userLimit);
+      }
+    }
   }
 
   private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
@@ -436,10 +498,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
             new Comparator<FiCaSchedulerApp>() {
               @Override
               public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
-                return a1.getApplicationId().compareTo(a2.getApplicationId());
+                if (a1.getPriority() != null
+                    && !a1.getPriority().equals(a2.getPriority())) {
+                  return a1.getPriority().compareTo(a2.getPriority());
+                }
+
+                int res = a1.getApplicationId()
+                    .compareTo(a2.getApplicationId());
+                return res;
               }
             });
         when(leafQueue.getApplications()).thenReturn(apps);
+        when(leafQueue.getAllApplications()).thenReturn(apps);
         OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
         when(so.getPreemptionIterator()).thenAnswer(new Answer() {
           public Object answer(InvocationOnMock invocation) {
@@ -518,10 +588,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       float absUsed = Resources.divide(rc, totResoucePerPartition,
           parseResourceFromString(values[2].trim()), totResoucePerPartition)
           + epsilon;
+      float used = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[2].trim()),
+          parseResourceFromString(values[0].trim())) + epsilon;
       Resource pending = parseResourceFromString(values[3].trim());
       qc.setAbsoluteCapacity(partitionName, absGuaranteed);
       qc.setAbsoluteMaximumCapacity(partitionName, absMax);
       qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+      qc.setUsedCapacity(partitionName, used);
+      when(queue.getUsedCapacity()).thenReturn(used);
       ru.setPending(partitionName, pending);
       if (!isParent(queueExprArray, idx)) {
         LeafQueue lq = (LeafQueue) queue;
@@ -536,6 +611,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         reserved = parseResourceFromString(values[4].trim());
         ru.setReserved(partitionName, reserved);
       }
+
       LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
           + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
           + ",abs_used" + absUsed + ",pending_resource=" + pending

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90dd3a81/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
new file mode 100644
index 0000000..19fb0d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -0,0 +1,868 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test class for IntraQueuePreemption scenarios.
+ */
+public class TestProportionalCapacityPreemptionPolicyIntraQueue
+    extends
+      ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  @Test
+  public void testSimpleIntraQueuePreemption() throws IOException {
+    /**
+     * The simplest test preemption, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
+     * 100
+     * Scenario:
+     * Queue B has few running apps and two high priority apps have demand.
+     * Apps which are running at low priority (4) will preempt few of its
+     * resources to meet the demand.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 80 120 0]);" + // root
+            "-a(=[11 100 11 50 0]);" + // a
+            "-b(=[40 100 38 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[29 100 20 0 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,6,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(1,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,34,false,20);" + // app3 b
+            "b\t" // app4 in b
+            + "(4,1,n1,,2,false,10);" + // app4 b
+            "b\t" // app4 in b
+            + "(5,1,n1,,1,false,10);" + // app5 b
+            "b\t" // app4 in b
+            + "(6,1,n1,,1,false,10);" + // app6 in b
+            "c\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+            + "(1,1,n1,,20,false,0)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B, app3 and app4 were of lower priority. Hence take 8
+    // containers from them by hitting the intraQueuePreemptionDemand of 20%.
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(7)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNoPreemptionForSamePriorityApps() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+     * 100
+     * Scenario: In queue A/B, all apps are running at same priority. However
+     * there are many demands also from these apps. Since all apps are at same
+     * priority, preemption should not occur here.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 80 120 0]);" + // root
+            "-a(=[10 100 10 50 0]);" + // a
+            "-b(=[40 100 40 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[30 100 20 0 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,6,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(1,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(1,1,n1,,34,false,20);" + // app3 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,2,false,10);" + // app4 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,1,false,20);" + // app5 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,1,false,10);" + // app6 in b
+            "c\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+            + "(1,1,n1,,20,false,0)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B, none of the apps should be preempted.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(5))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(6))));
+  }
+
+  @Test
+  public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
+     * default, this limit is 50%. Test to verify that there wont be any
+     * preemption since used capacity is under 50% for queue a/b even though
+     * there are demands from high priority apps in queue.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 35 80 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 25 30 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue A/B, none of the apps should be preempted as used capacity
+    // is under 50%.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+  }
+
+  @Test
+  public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
+     * that the maximum preemption should occur upto 50%, eventhough demand is
+     * more.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 45 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queueB, eventhough app4 needs 100 resources, only 30 resources were
+    // preempted. (max is 50% of guaranteed cap of any queue
+    // "maxIntraQueuePreemptable")
+    verify(mDisp, times(30)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * totalPreemption allowed is 10%. This test is to verify that only
+     * 10% is preempted.
+     */
+
+    // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 0.1);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 45 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B eventhough app4 needs 100 resources, only 10 resources were
+    // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
+    verify(mDisp, times(10)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testAlreadySelectedContainerFromInterQueuePreemption()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * QueueB is under utilized and QueueA has to release 9 containers here.
+     * However within queue A, high priority app has also a demand for 20.
+     * So additional 11 more containers will be preempted making a tota of 20.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 95 170 0]);" + // root
+            "-a(=[60 100 70 50 0]);" + // a
+            "-b(=[40 100 25 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,50,false,15);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,20,false,20);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,20,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(4,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // As per intra queue preemption algorithm, 20 more containers were needed
+    // for app2 (in queue a). Inter queue pre-emption had already preselected 9
+    // containers and hence preempted only 11 more.
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testSkipAMContainersInInterQueuePreemption() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+     * While preempting containers during intra-queue preemption, AM containers
+     * will be spared for now. Verify the same.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 170 0]);" + // root
+            "-a(=[60 100 60 50 0]);" + // a
+            "-b(=[40 100 40 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
+            + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
+            + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
+            + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
+            + "(4,1,n1,,20,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
+    verify(mDisp, times(11)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testSkipAMContainersInInterQueuePreemptionSingleApp()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 50:50 Total cluster resource = 100
+     * Spare Am container from a lower priority app during its preemption
+     * cycle. Eventhough there are more demand and no other low priority
+     * apps are present, still AM contaier need to soared.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 170 0]);" + // root
+            "-a(=[50 100 50 50 0]);" + // a
+            "-b(=[50 100 50 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
+            + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
+            + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
+            + "(4,1,n1,,30,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Make sure that app1's Am container is spared. Only 9/10 is preempted.
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testNoPreemptionForSingleApp() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+     * Only one app is running in queue. And it has more demand but no
+     * resource are available in queue. Preemption must not occur here.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 20 50 0]);" + // root
+            "-a(=[60 100 20 50 0]);" + // a
+            "-b(=[40 100 0 0 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(4,1,n1,,20,false,50)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Ensure there are 0 preemptions since only one app is running in queue.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+
+  @Test
+  public void testOverutilizedQueueResourceWithInterQueuePreemption()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     * Scenario:
+     * Guaranteed resource of a/b are 20:80 Total cluster resource = 100
+     * QueueB is under utilized and 20 resource will be released from queueA.
+     * 10 containers will also selected for intra-queue too but it will be
+     * pre-selected.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 0]);" + // root
+            "-a(=[20 100 100 30 0]);" + // a
+            "-b(=[80 100 0 20 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
+            + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
+            + "(4,1,n1,,0,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Complete demand request from QueueB for 20 resource must be preempted.
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNodePartitionIntraQueuePreemption() throws IOException {
+    /**
+     * The simplest test of node label, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Scenario:
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
+     * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
+     * resource for x for app3 of priority 2
+     *
+     * After preemption, it should preempt 20 from app1
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig = "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+            "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
+            "-b(=[50 100 50 50],x=[50 100 50 50])"; // b
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,x,50,false,10);" + // 50 * x in n1
+            "a\t" // app2 in a
+            + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+            "a\t" // app2 in a
+            + "(1,1,n2,,50,false);" + // 50 default in n2
+            "b\t" // app3 in b
+            + "(1,1,n1,x,50,false);" + // 50 * x in n1
+            "b\t" // app4 in b
+            + "(1,1,n2,,50,false)"; // 50 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 20 preempted from app1
+    verify(mDisp, times(20))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, never())
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never())
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testComplexIntraQueuePreemption() throws IOException {
+    /**
+     * The complex test preemption, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+     * 100
+     * All queues under its capacity, but within each queue there are many
+     * under served applications.
+     */
+
+    // report "ideal" preempt as 50%. Ensure preemption happens only for 50%
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 0.5);
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 75 130 0]);" + // root
+            "-a(=[10 100 5 50 0]);" + // a
+            "-b(=[40 100 35 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[30 100 25 10 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t"
+            + "(4,1,n1,,0,false,25);" + // app2 a
+            "a\t"
+            + "(5,1,n1,,0,false,2);" + // app3 a
+            "b\t"
+            + "(3,1,n1,,5,false,20);" + // app4 b
+            "b\t"
+            + "(4,1,n1,,15,false,10);" + // app5 b
+            "b\t"
+            + "(4,1,n1,,10,false,10);" + // app6 b
+            "b\t"
+            + "(5,1,n1,,3,false,5);" + // app7 b
+            "b\t"
+            + "(5,1,n1,,0,false,2);" + // app8 b
+            "b\t"
+            + "(6,1,n1,,2,false,10);" + // app9 in b
+            "c\t"
+            + "(1,1,n1,,8,false,10);" + // app10 in c
+            "c\t"
+            + "(1,1,n1,,2,false,5);" + // app11 in c
+            "c\t"
+            + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
+            + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
+            + "(1,1,n1,,0,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // High priority app in queueA has 30 resource demand. But low priority
+    // app has only 5 resource. Hence preempt 4 here sparing AM.
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    // Multiple high priority apps has demand  of 17. This will be preempted
+    // from another set of low priority apps.
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(6))));
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(5))));
+    // Only 3 resources will be freed in this round for queue C as we
+    // are trying to save AM container.
+    verify(mDisp, times(2)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(10))));
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(11))));
+  }
+
+  @Test
+  public void testIntraQueuePreemptionWithTwoUsers()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * Consider 2 users in a queue, assume minimum user limit factor is 50%.
+     * Hence in queueB of 40, each use has a quota of 20. app4 of high priority
+     * has a demand of 30 and its already using 5. Adhering to userlimit only
+     * 15 more must be preempted. If its same user,20 would have been preempted
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[60 100 10 50 0]);" + // a
+            "-b(=[40 100 40 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,35,false,20,user1);" + // app3 b
+            "b\t" // app4 in b
+            + "(6,1,n1,,5,false,30,user2)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Considering user-limit of 50% since only 2 users are there, only preempt
+    // 15 more (5 is already running) eventhough demand is for 30.
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testComplexNodePartitionIntraQueuePreemption()
+      throws IOException {
+    /**
+     * The simplest test of node label, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Scenario:
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1-app4 in a, and app5-app9 in b.
+     *
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig = "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+            "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
+            "-b(=[50 100 35 50],x=[50 100 50 50])"; // b
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,x,35,false,10);" + // 20 * x in n1
+            "a\t" // app2 in a
+            + "(1,1,n1,x,5,false,10);" + // 20 * x in n1
+            "a\t" // app3 in a
+            + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+            "a\t" // app4 in a
+            + "(1,1,n2,,50,false);" + // 50 default in n2
+            "b\t" // app5 in b
+            + "(1,1,n1,x,50,false);" + // 50 * x in n1
+            "b\t" // app6 in b
+            + "(1,1,n2,,25,false);" + // 25 * default in n2
+            "b\t" // app7 in b
+            + "(1,1,n2,,3,false);" + // 3 * default in n2
+            "b\t" // app8 in b
+            + "(1,1,n2,,2,false);" + // 2 * default in n2
+            "b\t" // app9 in b
+            + "(5,1,n2,,5,false,30)"; // 50 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Label X: app3 has demand of 20 for label X. Hence app2 will loose
+    // 4 (sparing AM) and 16 more from app1 till preemption limit is met.
+    verify(mDisp, times(16))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(4))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+
+    // Default Label:For a demand of 30, preempt from all low priority
+    // apps of default label. 25 will be preempted as preemption limit is
+    // met.
+    verify(mDisp, times(1))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
+    verify(mDisp, times(2))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
+    verify(mDisp, times(22))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message