hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvasu...@apache.org
Subject [02/52] [abbrv] hadoop git commit: YARN-5864. Capacity Scheduler - Queue Priorities. (wangda)
Date Mon, 30 Jan 2017 06:03:54 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
new file mode 100644
index 0000000..fe60611
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * For two queues with the same priority:
+ * - The queue with less relative used-capacity goes first - today’s behavior.
+ * - The default priority for all queues is 0 and equal. So, we get today’s
+ *   behaviour at every level - the queue with the lowest used-capacity
+ *   percentage gets the resources
+ *
+ * For two queues with different priorities:
+ * - Both the queues are under their guaranteed capacities: The queue with
+ *   the higher priority gets resources
+ * - Both the queues are over or meeting their guaranteed capacities:
+ *   The queue with the higher priority gets resources
+ * - One of the queues is over or meeting their guaranteed capacities and the
+ *   other is under: The queue that is under its capacity guarantee gets the
+ *   resources.
+ */
+public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPolicy {
+  private List<CSQueue> queues;
+  private boolean respectPriority;
+
+  // This makes multiple threads can sort queues at the same time
+  // For different partitions.
+  private static ThreadLocal<String> partitionToLookAt =
+      ThreadLocal.withInitial(new Supplier<String>() {
+        @Override
+        public String get() {
+          return RMNodeLabelsManager.NO_LABEL;
+        }
+      });
+
+  /**
+   * Compare two queues with possibly different priority and assigned capacity,
+   * Will be used by preemption policy as well.
+   *
+   * @param relativeAssigned1 relativeAssigned1
+   * @param relativeAssigned2 relativeAssigned2
+   * @param priority1 p1
+   * @param priority2 p2
+   * @return compared result
+   */
+  public static int compare(double relativeAssigned1, double relativeAssigned2,
+      int priority1, int priority2) {
+    if (priority1 == priority2) {
+      // The queue with less relative used-capacity goes first
+      return Double.compare(relativeAssigned1, relativeAssigned2);
+    } else {
+      // When priority is different:
+      if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || (
+          relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) {
+        // When both the queues are under their guaranteed capacities,
+        // Or both the queues are over or meeting their guaranteed capacities
+        // queue with higher used-capacity goes first
+        return Integer.compare(priority2, priority1);
+      } else {
+        // Otherwise, when one of the queues is over or meeting their
+        // guaranteed capacities and the other is under: The queue that is
+        // under its capacity guarantee gets the resources.
+        return Double.compare(relativeAssigned1, relativeAssigned2);
+      }
+    }
+  }
+
+  /**
+   * Comparator that both looks at priority and utilization
+   */
+  private class PriorityQueueComparator implements  Comparator<CSQueue> {
+
+    @Override
+    public int compare(CSQueue q1, CSQueue q2) {
+      String p = partitionToLookAt.get();
+
+      int rc = compareQueueAccessToPartition(q1, q2, p);
+      if (0 != rc) {
+        return rc;
+      }
+
+      float used1 = q1.getQueueCapacities().getUsedCapacity(p);
+      float used2 = q2.getQueueCapacities().getUsedCapacity(p);
+      int p1 = 0;
+      int p2 = 0;
+      if (respectPriority) {
+        p1 = q1.getPriority().getPriority();
+        p2 = q2.getPriority().getPriority();
+      }
+
+      rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2);
+
+      // For queue with same used ratio / priority, queue with higher configured
+      // capacity goes first
+      if (0 == rc) {
+        float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
+        float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
+        return Float.compare(abs2, abs1);
+      }
+
+      return rc;
+    }
+
+    private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partition) {
+      // Everybody has access to default partition
+      if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) {
+        return 0;
+      }
+
+      /*
+       * Check accessible to given partition, if one queue accessible and
+       * the other not, accessible queue goes first.
+       */
+      boolean q1Accessible =
+          q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels()
+              .contains(partition) || q1.getAccessibleNodeLabels().contains(
+              RMNodeLabelsManager.ANY);
+      boolean q2Accessible =
+          q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels()
+              .contains(partition) || q2.getAccessibleNodeLabels().contains(
+              RMNodeLabelsManager.ANY);
+      if (q1Accessible && !q2Accessible) {
+        return -1;
+      } else if (!q1Accessible && q2Accessible) {
+        return 1;
+      }
+
+      return 0;
+    }
+  }
+
+  public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
+    this.respectPriority = respectPriority;
+  }
+
+  @Override
+  public void setQueues(List<CSQueue> queues) {
+    this.queues = queues;
+  }
+
+  @Override
+  public Iterator<CSQueue> getAssignmentIterator(String partition) {
+    // Since partitionToLookAt is a thread local variable, and every time we
+    // copy and sort queues, so it's safe for multi-threading environment.
+    PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
+    List<CSQueue> sortedQueue = new ArrayList<>(queues);
+    Collections.sort(sortedQueue, new PriorityQueueComparator());
+    return sortedQueue.iterator();
+  }
+
+  @Override
+  public String getConfigName() {
+    if (respectPriority) {
+      return CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY;
+    } else{
+      return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java
new file mode 100644
index 0000000..a434ab0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This will be used by
+ * {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue}
+ * to decide allocation ordering of child queues.
+ */
+public interface QueueOrderingPolicy {
+  void setQueues(List<CSQueue> queues);
+
+  /**
+   * Return an iterator over the collection of CSQueues which orders
+   * them for container assignment.
+   *
+   * Please note that, to avoid queue's set updated during sorting / iterating.
+   * Caller need to make sure parent queue's read lock is properly acquired.
+   *
+   * @param partition nodePartition
+   *
+   * @return iterator of queues to allocate
+   */
+  Iterator<CSQueue> getAssignmentIterator(String partition);
+
+  /**
+   * Returns configuration name (which will be used to set ordering policy
+   * @return configuration name
+   */
+  String getConfigName();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 4329335..5e3b9be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -98,13 +99,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   private final Set<ContainerId> containersToPreempt =
     new HashSet<ContainerId>();
-    
+
   private CapacityHeadroomProvider headroomProvider;
 
   private ResourceCalculator rc = new DefaultResourceCalculator();
 
   private ResourceScheduler scheduler;
-  
+
   private AbstractContainerAllocator containerAllocator;
 
   /**
@@ -115,7 +116,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   private Map<ContainerId, SchedContainerChangeRequest> toBeRemovedIncRequests =
       new ConcurrentHashMap<>();
 
-  public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
+  public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
     this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
@@ -831,7 +832,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
     return null;
   }
-  
+
   public void setHeadroomProvider(
     CapacityHeadroomProvider headroomProvider) {
     try {
@@ -841,7 +842,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       writeLock.unlock();
     }
   }
-  
+
   @Override
   public Resource getHeadroom() {
     try {
@@ -855,7 +856,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
 
   }
-  
+
   @Override
   public void transferStateFromPreviousAttempt(
       SchedulerApplicationAttempt appAttempt) {
@@ -867,7 +868,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       writeLock.unlock();
     }
   }
-  
+
   public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node,
       RMContainer rmContainer, Resource reservedResource) {
@@ -1148,4 +1149,85 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   public boolean equals(Object o) {
     return super.equals(o);
   }
+
+  /**
+   * Move reservation from one node to another
+   * Comparing to unreserve container on source node and reserve a new
+   * container on target node. This method will not create new RMContainer
+   * instance. And this operation is atomic.
+   *
+   * @param reservedContainer to be moved reserved container
+   * @param sourceNode source node
+   * @param targetNode target node
+   *
+   * @return succeeded or not
+   */
+  public boolean moveReservation(RMContainer reservedContainer,
+      FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) {
+    try {
+      writeLock.lock();
+      if (!sourceNode.getPartition().equals(targetNode.getPartition())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Failed to move reservation, two nodes are in different partition");
+        }
+        return false;
+      }
+
+      // Update reserved container to node map
+      Map<NodeId, RMContainer> map = reservedContainers.get(
+          reservedContainer.getReservedSchedulerKey());
+      if (null == map) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot find reserved container map.");
+        }
+        return false;
+      }
+
+      // Check if reserved container changed
+      if (sourceNode.getReservedContainer() != reservedContainer) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("To-be-moved container already updated.");
+        }
+        return false;
+      }
+
+      // Check if target node is empty, acquires lock of target node to make sure
+      // reservation happens transactional
+      synchronized (targetNode){
+        if (targetNode.getReservedContainer() != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Target node is already occupied before moving");
+          }
+        }
+
+        try {
+          targetNode.reserveResource(this,
+              reservedContainer.getReservedSchedulerKey(), reservedContainer);
+        } catch (IllegalStateException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reserve on target node failed, e=", e);
+          }
+          return false;
+        }
+
+        // Set source node's reserved container to null
+        sourceNode.setReservedContainer(null);
+        map.remove(sourceNode.getNodeID());
+
+        // Update reserved container
+        reservedContainer.handle(
+            new RMContainerReservedEvent(reservedContainer.getContainerId(),
+                reservedContainer.getReservedResource(), targetNode.getNodeID(),
+                reservedContainer.getReservedSchedulerKey()));
+
+        // Add to target node
+        map.put(targetNode.getNodeID(), reservedContainer);
+
+        return true;
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index a09a33c..b2d7d16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -61,6 +62,7 @@ import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -476,6 +478,14 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
    * -B...
    * </pre>
    * ";" splits queues, and there should no empty lines, no extra spaces
+   *
+   * For each queue, it has configurations to specify capacities (to each
+   * partition), format is:
+   * <pre>
+   * -<queueName> (<labelName1>=[guaranteed max used pending], \
+   *               <labelName2>=[guaranteed max used pending])
+   *              {key1=value1,key2=value2};  // Additional configs
+   * </pre>
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private ParentQueue mockQueueHierarchy(String queueExprs) {
@@ -491,6 +501,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         queue = parentQueue;
         List<CSQueue> children = new ArrayList<CSQueue>();
         when(parentQueue.getChildQueues()).thenReturn(children);
+        QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
+        when(policy.getConfigName()).thenReturn(
+            CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+        when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
       } else {
         LeafQueue leafQueue = mock(LeafQueue.class);
         final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
@@ -625,11 +639,57 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
     when(queue.getPreemptionDisabled()).thenReturn(
         conf.getPreemptionDisabled(queuePath, false));
 
+    // Setup other queue configurations
+    Map<String, String> otherConfigs = getOtherConfigurations(
+        queueExprArray[idx]);
+    if (otherConfigs.containsKey("priority")) {
+      when(queue.getPriority()).thenReturn(
+          Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
+    } else {
+      // set queue's priority to 0 by default
+      when(queue.getPriority()).thenReturn(Priority.newInstance(0));
+    }
+
+    // Setup disable preemption of queues
+    if (otherConfigs.containsKey("disable_preemption")) {
+      when(queue.getPreemptionDisabled()).thenReturn(
+          Boolean.valueOf(otherConfigs.get("disable_preemption")));
+    }
+
     nameToCSQueues.put(queueName, queue);
     when(cs.getQueue(eq(queueName))).thenReturn(queue);
   }
 
   /**
+   * Get additional queue's configurations
+   * @param queueExpr queue expr
+   * @return maps of configs
+   */
+  private Map<String, String> getOtherConfigurations(String queueExpr) {
+    if (queueExpr.contains("{")) {
+      int left = queueExpr.indexOf('{');
+      int right = queueExpr.indexOf('}');
+
+      if (right > left) {
+        Map<String, String> configs = new HashMap<>();
+
+        String subStr = queueExpr.substring(left + 1, right);
+        for (String kv : subStr.split(",")) {
+          if (kv.contains("=")) {
+            String key = kv.substring(0, kv.indexOf("="));
+            String value = kv.substring(kv.indexOf("=") + 1);
+            configs.put(key, value);
+          }
+        }
+
+        return configs;
+      }
+    }
+
+    return Collections.EMPTY_MAP;
+  }
+
+  /**
    * Level of a queue is how many "-" at beginning, root's level is 0
    */
   private int getLevel(String q) {
@@ -739,6 +799,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
     Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
   }
 
+  public void checkPriority(CSQueue queue, int expectedPriority) {
+    Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
+  }
+
   public void checkReservedResource(CSQueue queue, String partition, int reserved) {
     ResourceUsage ru = queue.getQueueResourceUsage();
     Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
new file mode 100644
index 0000000..2b54d77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestPreemptionForQueueWithPriorities
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  @Test
+  public void testPreemptionForHighestPriorityUnderutilizedQueue()
+      throws IOException {
+    /**
+     * The simplest test of queue with priorities, Queue structure is:
+     *
+     * <pre>
+     *        root
+     *       / |  \
+     *      a  b   c
+     * </pre>
+     *
+     * For priorities
+     * - a=1
+     * - b/c=2
+     *
+     * So c will preempt more resource from a, till a reaches guaranteed
+     * resource.
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 40 50]){priority=1};" + // a
+            "-b(=[30 100 59 50]){priority=2};" + // b
+            "-c(=[40 100 1 25]){priority=2}";    // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,40,false);" + // app1 in a
+        "b\t(1,1,n1,,59,false);" + // app2 in b
+        "c\t(1,1,n1,,1,false);";   // app3 in c
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 10 preempted from app1, 15 preempted from app2, and nothing preempted
+    // from app3
+    verify(mDisp, times(10)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testPreemptionForLowestPriorityUnderutilizedQueue()
+      throws IOException {
+    /**
+     * Similar to above, make sure we can still make sure less utilized queue
+     * can get resource first regardless of priority.
+     *
+     * Queue structure is:
+     *
+     * <pre>
+     *        root
+     *       / |  \
+     *      a  b   c
+     * </pre>
+     *
+     * For priorities
+     * - a=1
+     * - b=2
+     * - c=0
+     *
+     * So c will preempt more resource from a, till a reaches guaranteed
+     * resource.
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 40 50]){priority=1};" + // a
+            "-b(=[30 100 59 50]){priority=2};" + // b
+            "-c(=[40 100 1 25]){priority=0}";    // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+            "a\t(1,1,n1,,40,false);" + // app1 in a
+            "b\t(1,1,n1,,59,false);" + // app2 in b
+            "c\t(1,1,n1,,1,false);";   // app3 in c
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 10 preempted from app1, 15 preempted from app2, and nothing preempted
+    // from app3
+    verify(mDisp, times(10)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testPreemptionWontHappenBetweenSatisfiedQueues()
+      throws IOException {
+    /**
+     * No preemption happen if a queue is already satisfied, regardless of
+     * priority
+     *
+     * Queue structure is:
+     *
+     * <pre>
+     *        root
+     *       / |  \
+     *      a  b   c
+     * </pre>
+     *
+     * For priorities
+     * - a=1
+     * - b=1
+     * - c=2
+     *
+     * When c is satisfied, it will not preempt any resource from other queues
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 0 0]){priority=1};" + // a
+            "-b(=[30 100 40 50]){priority=1};" + // b
+            "-c(=[40 100 60 25]){priority=2}";   // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "b\t(1,1,n1,,40,false);" + // app1 in b
+        "c\t(1,1,n1,,60,false)"; // app2 in c
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Nothing preempted
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets()
+      throws IOException {
+    /**
+     * When a cluster has different priorities, each priority has multiple
+     * queues, preemption policy should try to balance resource between queues
+     * with same priority by ratio of their capacities
+     *
+     * Queue structure is:
+     *
+     * <pre>
+     * root
+     * - a (capacity=10), p=1
+     * - b (capacity=15), p=1
+     * - c (capacity=20), p=2
+     * - d (capacity=25), p=2
+     * - e (capacity=30), p=2
+     * </pre>
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[10 100 35 50]){priority=1};" + // a
+            "-b(=[15 100 25 50]){priority=1};" + // b
+            "-c(=[20 100 39 50]){priority=2};" + // c
+            "-d(=[25 100 0 0]){priority=2};" + // d
+            "-e(=[30 100 1 99]){priority=2}";   // e
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,35,false);" + // app1 in a
+        "b\t(1,1,n1,,25,false);" + // app2 in b
+            "c\t(1,1,n1,,39,false);" + // app3 in c
+            "e\t(1,1,n1,,1,false)"; // app4 in e
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 23 preempted from app1, 6 preempted from app2, and nothing preempted
+    // from app3/app4
+    // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
+    //  preemption is 1.58, close to 1.50)
+    verify(mDisp, times(23)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(6)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+  }
+
+  @Test
+  public void testPreemptionForPriorityAndDisablePreemption()
+      throws IOException {
+    /**
+     * When a cluster has different priorities, each priority has multiple
+     * queues, preemption policy should try to balance resource between queues
+     * with same priority by ratio of their capacities.
+     *
+     * But also we need to make sure preemption disable will be honered
+     * regardless of priority.
+     *
+     * Queue structure is:
+     *
+     * <pre>
+     * root
+     * - a (capacity=10), p=1
+     * - b (capacity=15), p=1
+     * - c (capacity=20), p=2
+     * - d (capacity=25), p=2
+     * - e (capacity=30), p=2
+     * </pre>
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a
+            "-b(=[15 100 25 50]){priority=1};" + // b
+            "-c(=[20 100 39 50]){priority=2};" + // c
+            "-d(=[25 100 0 0]){priority=2};" + // d
+            "-e(=[30 100 1 99]){priority=2}";   // e
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,35,false);" + // app1 in a
+            "b\t(1,1,n1,,25,false);" + // app2 in b
+            "c\t(1,1,n1,,39,false);" + // app3 in c
+            "e\t(1,1,n1,,1,false)"; // app4 in e
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // We suppose to preempt some resource from A, but now since queueA
+    // disables preemption, so we need to preempt some resource from B and
+    // some from C even if C has higher priority than A
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(19)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+  }
+
+  @Test
+  public void testPriorityPreemptionForHierarchicalOfQueues()
+      throws IOException {
+    /**
+     * When a queue has multiple hierarchy and different priorities:
+     *
+     * <pre>
+     * root
+     * - a (capacity=30), p=1
+     *   - a1 (capacity=40), p=1
+     *   - a2 (capacity=60), p=1
+     * - b (capacity=30), p=1
+     *   - b1 (capacity=50), p=1
+     *   - b1 (capacity=50), p=2
+     * - c (capacity=40), p=2
+     * </pre>
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 40 50]){priority=1};" + // a
+            "--a1(=[12 100 20 50]){priority=1};" + // a1
+            "--a2(=[18 100 20 50]){priority=1};" + // a2
+            "-b(=[30 100 59 50]){priority=1};" + // b
+            "--b1(=[15 100 30 50]){priority=1};" + // b1
+            "--b2(=[15 100 29 50]){priority=2};" + // b2
+            "-c(=[40 100 1 30]){priority=1}";   // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a1\t(1,1,n1,,20,false);" + // app1 in a1
+            "a2\t(1,1,n1,,20,false);" + // app2 in a2
+            "b1\t(1,1,n1,,30,false);" + // app3 in b1
+            "b2\t(1,1,n1,,29,false);" + // app4 in b2
+            "c\t(1,1,n1,,29,false)"; // app5 in c
+
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Preemption should first divide capacities between a / b, and b2 should
+    // get less preemption than b1 (because b2 has higher priority)
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 7eca34f..a14a2b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -220,7 +221,9 @@ public class TestProportionalCapacityPreemptionPolicy {
     };
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
-    verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+    // A will preempt guaranteed-allocated.
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
   
   @Test
@@ -588,8 +591,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     };
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
-    // correct imbalance between over-capacity queues
-    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    // Will not preempt for over capacity queues
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
 
   @Test
@@ -702,7 +705,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   public void testZeroGuarOverCap() {
     int[][] qData = new int[][] {
       //  /    A   B   C    D   E   F
-         { 200, 100, 0, 99, 0, 100, 100 },  // abs
+         { 200, 100, 0, 100, 0, 100, 100 },  // abs
         { 200, 200, 200, 200, 200, 200, 200 },  // maxCap
         { 170,  170, 60, 20, 90, 0,  0 },  // used
         {  85,   50,  30,  10,  10,  20, 20 },  // pending
@@ -713,14 +716,14 @@ public class TestProportionalCapacityPreemptionPolicy {
     };
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
-    // we verify both that C has priority on B and D (has it has >0 guarantees)
-    // and that B and D are force to share their over capacity fairly (as they
-    // are both zero-guarantees) hence D sees some of its containers preempted
-    verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    // No preemption should happen because zero guaranteed queues should be
+    // treated as always satisfied, they should not preempt from each other.
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD)));
   }
   
-  
-  
   @Test
   public void testHierarchicalLarge() {
     int[][] qData = new int[][] {
@@ -1232,6 +1235,13 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(pq.getChildQueues()).thenReturn(cqs);
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     when(pq.getReadLock()).thenReturn(lock.readLock());
+
+    // Ordering policy
+    QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
+    when(policy.getConfigName()).thenReturn(
+        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+    when(pq.getQueueOrderingPolicy()).thenReturn(policy);
+    when(pq.getPriority()).thenReturn(Priority.newInstance(0));
     for (int i = 0; i < subqueues; ++i) {
       pqs.add(pq);
     }
@@ -1302,6 +1312,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     }
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     when(lq.getReadLock()).thenReturn(lock.readLock());
+    when(lq.getPriority()).thenReturn(Priority.newInstance(0));
     p.getChildQueues().add(lq);
     return lq;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index e31a889..1fd455a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -95,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
   }
 
   @Test
-  public void testNodePartitionPreemptionRespectMaximumCapacity()
+  public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues()
       throws IOException {
     /**
      * Queue structure is:
@@ -114,8 +114,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
      * 2 apps in cluster.
      * app1 in b and app2 in c.
      *
-     * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x
-     * from app1 because of max capacity.
+     * app1 uses 90x, and app2 use 10x. We don't expect preemption happen
+     * between them because all of them are satisfied
      */
     String labelsConfig =
         "=100,true;" + // default partition
@@ -139,9 +139,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    // 30 preempted from app1, 30 preempted from app4, and nothing preempted
-    // from app2/app3
-    verify(mDisp, times(20)).handle(
+    // No preemption happens
+    verify(mDisp, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
     verify(mDisp, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 07d1eef..964a230 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -46,8 +46,8 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
         "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
             "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
             "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
-            "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
-            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+            "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}";
     String appsConfig=
         //queueName\t(priority,resource,host,expression,#repeat,reserved)
         // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
@@ -75,6 +75,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
     checkPendingResource(cs.getQueue("root"), "red", 100);
     checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
     checkPendingResource(cs.getQueue("root"), "blue", 200);
+    checkPriority(cs.getQueue("root"), 0); // default
 
     // a
     checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
@@ -83,6 +84,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
     checkPendingResource(cs.getQueue("a"), "red", 0);
     checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
     checkPendingResource(cs.getQueue("a"), "blue", 200);
+    checkPriority(cs.getQueue("a"), 0); // default
 
     // a1
     checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
@@ -91,6 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
     checkPendingResource(cs.getQueue("a1"), "red", 0);
     checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
     checkPendingResource(cs.getQueue("a1"), "blue", 0);
+    checkPriority(cs.getQueue("a1"), 0); // default
 
     // a2
     checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
@@ -99,14 +102,18 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
     checkPendingResource(cs.getQueue("a2"), "red", 0);
     checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
     checkPendingResource(cs.getQueue("a2"), "blue", 200);
+    checkPriority(cs.getQueue("a2"), 2);
+    Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled());
 
-    // b1
+    // b
     checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
     checkPendingResource(cs.getQueue("b"), "", 0);
     checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
     checkPendingResource(cs.getQueue("b"), "red", 100);
     checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
     checkPendingResource(cs.getQueue("b"), "blue", 0);
+    checkPriority(cs.getQueue("b"), 1);
+    Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled());
 
     // Check ignored partitioned containers in queue
     Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
index bd9f615..943b7d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -46,7 +46,7 @@ public class CapacitySchedulerPreemptionTestBase {
 
   final int GB = 1024;
 
-  Configuration conf;
+  CapacitySchedulerConfiguration conf;
 
   RMNodeLabelsManager mgr;
 
@@ -54,13 +54,15 @@ public class CapacitySchedulerPreemptionTestBase {
 
   @Before
   void setUp() throws Exception {
-    conf = new YarnConfiguration();
+    conf = new CapacitySchedulerConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
     conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
     conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
         ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
-    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+    conf = (CapacitySchedulerConfiguration) TestUtils
+        .getConfigurationWithMultipleQueues(this.conf);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB);
 
     // Set preemption related configurations
     conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
@@ -146,4 +148,18 @@ public class CapacitySchedulerPreemptionTestBase {
 
     Assert.fail();
   }
+
+  public void checkNumberOfPreemptionCandidateFromApp(
+      ProportionalCapacityPreemptionPolicy policy, int expected,
+      ApplicationAttemptId attemptId) {
+    int total = 0;
+
+    for (RMContainer rmContainer : policy.getToPreemptContainers().keySet()) {
+      if (rmContainer.getApplicationAttemptId().equals(attemptId)) {
+        ++ total;
+      }
+    }
+
+    Assert.assertEquals(expected, total);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 7382f3d..046ea4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -110,9 +110,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
-    when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(
-            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
@@ -276,9 +273,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 16));
-    when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(
-            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -581,9 +575,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB));
-    when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(
-            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     when(csContext.getRMContext()).thenReturn(rmContext);
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index 1f87c53..2fa06e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -594,9 +594,6 @@ public class TestApplicationLimitsByPartition {
         .thenReturn(Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability())
         .thenReturn(Resources.createResource(16 * GB));
-    when(csContext.getNonPartitionedQueueComparator())
-        .thenReturn(
-            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     RMContext rmContext = TestUtils.getMockRMContext();
     RMContext spyRMContext = spy(rmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index db6115c..5989da0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -22,22 +22,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Set;
 
 public class TestCapacitySchedulerSurgicalPreemption
     extends CapacitySchedulerPreemptionTestBase {
@@ -167,8 +171,7 @@ public class TestCapacitySchedulerSurgicalPreemption
      *
      * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
      *
-     * 2) app1 submit to queue-a first, it asked 38 * 1G containers
-     * We will allocate 20 on n1 and 19 on n2.
+     * 2) app1 submit to queue-b, asks for 1G * 5
      *
      * 3) app2 submit to queue-c, ask for one 4G container (for AM)
      *
@@ -243,4 +246,569 @@ public class TestCapacitySchedulerSurgicalPreemption
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+     *
+     * 2) app1 submit to queue-b first, it asked 6 * 1G containers
+     * We will allocate 4 on n1 (including AM) and 3 on n2.
+     *
+     * 3) app2 submit to queue-c, ask for one 18G container (for AM)
+     *
+     * After preemption, we should expect:
+     * Preempt 3 containers from app1 and AM of app2 successfully allocated.
+     */
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+    conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+    // Queue c has higher priority than a/b
+    conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<>());
+
+    // Do allocation for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, so the abs-used-cap of b is
+    // 7 / 40 = 17.5% < 20% (guaranteed)
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    // 4 from n1 and 3 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 4);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 3);
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c");
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(10);
+    }
+
+    // Call editSchedule immediately: containers are not selected
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Sleep the timeout interval, we should be able to see containers selected
+    Thread.sleep(1000);
+    editPolicy.editSchedule();
+    Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
+
+    // Call editSchedule again: selected containers are killed, and new AM
+    // container launched
+    editPolicy.editSchedule();
+
+    // Do allocation till reserved container allocated
+    while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(10);
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+
+    rm1.close();
+  }
+
+  @Test(timeout = 300000)
+  public void testPriorityPreemptionRequiresMoveReservation()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) 3 nodes in the cluster, 10G for each
+     *
+     * 2) app1 submit to queue-b first, it asked 2G each,
+     *    it can get 2G on n1 (AM), 2 * 2G on n2
+     *
+     * 3) app2 submit to queue-c, with 2G AM container (allocated on n3)
+     *    app2 requires 9G resource, which will be reserved on n3
+     *
+     * We should expect container unreserved from n3 and allocated on n1/n2
+     */
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+    conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
+
+    // Queue c has higher priority than a/b
+    conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
+    MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 2 * GB, 2, new ArrayList<>());
+
+    // Do allocation for node2 twice
+    for (int i = 0; i < 2; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+
+    // 1 from n1 and 2 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 1);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 2);
+
+    // Submit app2 to queue-c and asks for a 2G container for AM, on n3
+    RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Asks 1 * 9G container
+    am2.allocate("*", 9 * GB, 1, new ArrayList<>());
+
+    // Do allocation for node3 once
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
+
+    // Make sure container reserved on node3
+    Assert.assertNotNull(
+        cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+
+    // Call editSchedule immediately: nothing happens
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    Assert.assertNotNull(
+        cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+
+    // Sleep the timeout interval, we should be able to see reserved container
+    // moved to n2 (n1 occupied by AM)
+    Thread.sleep(1000);
+    editPolicy.editSchedule();
+    Assert.assertNull(
+        cs.getNode(rmNode3.getNodeID()).getReservedContainer());
+    Assert.assertNotNull(
+        cs.getNode(rmNode2.getNodeID()).getReservedContainer());
+    Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode(
+        rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
+
+    // Do it again, we should see containers marked to be preempt
+    editPolicy.editSchedule();
+    Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+
+    // Do allocation till reserved container allocated
+    while (schedulerApp2.getLiveContainers().size() < 2) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      Thread.sleep(200);
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+
+    rm1.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G.
+     *
+     * 2) app1 submit to queue-b first, it asked 8 * 1G containers
+     * We will allocate 1 container on each of n0-n10
+     *
+     * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM)
+     *
+     * After preemption, we should expect:
+     * Preempt 7 containers from app1 and usage of app2 is 70%
+     */
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+    conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+    // Queue c has higher priority than a/b
+    conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM[] mockNMs = new MockNM[10];
+    for (int i = 0; i < 10; i++) {
+      mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB);
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    RMNode[] rmNodes = new RMNode[10];
+    for (int i = 0; i < 10; i++) {
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
+    }
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
+
+    am1.allocate("*", 1 * GB, 8, new ArrayList<>());
+
+    // Do allocation for nm1-nm8
+    for (int i = 1; i < 9; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // App1 should have 9 containers now, so the abs-used-cap of b is 9%
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(9, schedulerApp1.getLiveContainers().size());
+    for (int i = 0; i < 9; i++) {
+      waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
+          am1.getApplicationAttemptId(), 1);
+    }
+
+    // Submit app2 to queue-c and asks for a 10G container for AM
+    // Launch AM in NM9
+    RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Ask 10 * 10GB containers
+    am2.allocate("*", 10 * GB, 10, new ArrayList<>());
+
+    // Do allocation for all nms
+    for (int i = 1; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // Check am2 reserved resource from nm1-nm9
+    for (int i = 1; i < 9; i++) {
+      Assert.assertNotNull("Should reserve on nm-" + i,
+          cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+    }
+
+    // Sleep the timeout interval, we should be able to see 6 containers selected
+    // 6 (selected) + 1 (allocated) which makes target capacity to 70%
+    Thread.sleep(1000);
+
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
+        am1.getApplicationAttemptId());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+    // Do allocation for all nms
+    for (int i = 1; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+    rm1.close();
+  }
+
+  @Test(timeout = 600000)
+  public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          45  45  10
+     * </pre>
+     *
+     * Priority of queue_a = 1
+     * Priority of queue_b = 2
+     *
+     * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers
+     *    We will allocate 1 container on each of n0-n4. AM on n4.
+     *
+     * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0
+     *    Ask for 2 * 3.5G containers. (Reserved on n0/n1)
+     *
+     * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2
+     *    Ask for 2 * 3.5G containers. (Reserved on n2/n3)
+     *
+     * First we will preempt container on n2 since it is the oldest container of
+     * Highest priority queue (b)
+     */
+
+    // Total preemption = 1G per round, which is 5% of cluster resource (20G)
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        0.05f);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
+    conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
+    conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
+        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+
+    // A/B has higher priority
+    conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1);
+    conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
+    conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
+    conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
+    conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM[] mockNMs = new MockNM[5];
+    for (int i = 0; i < 5; i++) {
+      mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB);
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    RMNode[] rmNodes = new RMNode[5];
+    for (int i = 0; i < 5; i++) {
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
+    }
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
+
+    am1.allocate("*", 1 * GB, 4, new ArrayList<>());
+
+    // Do allocation for nm1-nm8
+    for (int i = 0; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // App1 should have 5 containers now, one for each node
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+    for (int i = 0; i < 5; i++) {
+      waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
+          am1.getApplicationAttemptId(), 1);
+    }
+
+    // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
+    RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Ask 2 * 3.5GB containers
+    am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
+
+    // Do allocation for n0-n1
+    for (int i = 0; i < 2; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // Check am2 reserved resource from nm0-nm1
+    for (int i = 0; i < 2; i++) {
+      Assert.assertNotNull("Should reserve on nm-" + i,
+          cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+      Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
+          .getReservedContainer().getQueueName(), "a");
+    }
+
+    // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
+    RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
+    FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
+
+    // Ask 2 * 3.5GB containers
+    am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
+
+    // Do allocation for n2-n3
+    for (int i = 2; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    // Check am2 reserved resource from nm2-nm3
+    for (int i = 2; i < 4; i++) {
+      Assert.assertNotNull("Should reserve on nm-" + i,
+          cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
+      Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
+          .getReservedContainer().getQueueName(), "b");
+    }
+
+    // Sleep the timeout interval, we should be able to see 1 container selected
+    Thread.sleep(1000);
+
+    /* 1st container preempted is on n2 */
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+
+    // We should have one to-preempt container, on node[2]
+    Set<RMContainer> selectedToPreempt =
+        editPolicy.getToPreemptContainers().keySet();
+    Assert.assertEquals(1, selectedToPreempt.size());
+    Assert.assertEquals(mockNMs[2].getNodeId(),
+        selectedToPreempt.iterator().next().getAllocatedNode());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+
+    // Do allocation for all nms
+    for (int i = 0; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+    waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
+
+    /* 2nd container preempted is on n3 */
+    editPolicy.editSchedule();
+
+    // We should have one to-preempt container, on node[3]
+    selectedToPreempt =
+        editPolicy.getToPreemptContainers().keySet();
+    Assert.assertEquals(1, selectedToPreempt.size());
+    Assert.assertEquals(mockNMs[3].getNodeId(),
+        selectedToPreempt.iterator().next().getAllocatedNode());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+
+    // Do allocation for all nms
+    for (int i = 0; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
+    waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+    /* 3rd container preempted is on n0 */
+    editPolicy.editSchedule();
+
+    // We should have one to-preempt container, on node[0]
+    selectedToPreempt =
+        editPolicy.getToPreemptContainers().keySet();
+    Assert.assertEquals(1, selectedToPreempt.size());
+    Assert.assertEquals(mockNMs[0].getNodeId(),
+        selectedToPreempt.iterator().next().getAllocatedNode());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
+
+    // Do allocation for all nms
+    for (int i = 0; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
+    waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+    /* 4th container preempted is on n1 */
+    editPolicy.editSchedule();
+
+    // We should have one to-preempt container, on node[0]
+    selectedToPreempt =
+        editPolicy.getToPreemptContainers().keySet();
+    Assert.assertEquals(1, selectedToPreempt.size());
+    Assert.assertEquals(mockNMs[1].getNodeId(),
+        selectedToPreempt.iterator().next().getAllocatedNode());
+
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+
+    // Do allocation for all nms
+    for (int i = 0; i < 4; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
+    }
+
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
+    waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
+    waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
+
+    rm1.close();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 3c1f676..899523c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -99,9 +99,6 @@ public class TestChildQueueOrder {
         Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
-    when(csContext.getNonPartitionedQueueComparator()).
-        thenReturn(
-            CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
     when(csContext.getResourceCalculator()).
         thenReturn(resourceComparator);
     when(csContext.getRMContext()).thenReturn(rmContext);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message