hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [3/3] hadoop git commit: YARN-5864. Capacity Scheduler - Queue Priorities. (wangda)
Date Tue, 24 Jan 2017 22:57:21 GMT
YARN-5864. Capacity Scheduler - Queue Priorities. (wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1309accd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1309accd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1309accd

Branch: refs/heads/branch-2
Commit: 1309accd687446b2bc13d89b08044628d4cf7b36
Parents: 1672a06
Author: Wangda Tan <wangda@apache.org>
Authored: Tue Jan 24 14:44:42 2017 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Tue Jan 24 14:44:42 2017 -0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   4 -
 .../AbstractPreemptableResourceCalculator.java  |  14 +-
 .../capacity/PreemptionCandidatesSelector.java  |   3 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  37 ++
 ...QueuePriorityContainerCandidateSelector.java | 510 +++++++++++++++++
 .../monitor/capacity/TempQueuePerPartition.java |  32 +-
 .../monitor/capacity/TempSchedulerNode.java     | 120 ++++
 .../rmcontainer/RMContainerImpl.java            |   7 +-
 .../scheduler/SchedulerNode.java                |   2 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  10 +
 .../scheduler/capacity/CSQueue.java             |   7 +
 .../scheduler/capacity/CapacityScheduler.java   |  75 ++-
 .../CapacitySchedulerConfiguration.java         | 236 +++++++-
 .../capacity/CapacitySchedulerContext.java      |   4 -
 .../capacity/CapacitySchedulerQueueManager.java |   3 -
 .../scheduler/capacity/LeafQueue.java           |   4 +-
 .../scheduler/capacity/ParentQueue.java         |  70 ++-
 .../capacity/PartitionedQueueComparator.java    |  72 ---
 .../PriorityUtilizationQueueOrderingPolicy.java | 185 ++++++
 .../capacity/policy/QueueOrderingPolicy.java    |  52 ++
 .../scheduler/common/fica/FiCaSchedulerApp.java |  96 +++-
 ...alCapacityPreemptionPolicyMockFramework.java |  65 +++
 .../TestPreemptionForQueueWithPriorities.java   | 361 ++++++++++++
 ...estProportionalCapacityPreemptionPolicy.java |  31 +-
 ...pacityPreemptionPolicyForNodePartitions.java |  11 +-
 ...alCapacityPreemptionPolicyMockFramework.java |  13 +-
 .../CapacitySchedulerPreemptionTestBase.java    |  22 +-
 .../capacity/TestApplicationLimits.java         |   9 -
 .../TestApplicationLimitsByPartition.java       |   3 -
 ...TestCapacitySchedulerSurgicalPreemption.java | 572 ++++++++++++++++++-
 .../scheduler/capacity/TestChildQueueOrder.java |   3 -
 .../capacity/TestContainerAllocation.java       | 111 ++++
 .../scheduler/capacity/TestLeafQueue.java       |  13 +-
 .../scheduler/capacity/TestParentQueue.java     |   3 -
 .../scheduler/capacity/TestReservations.java    |   2 -
 ...tPriorityUtilizationQueueOrderingPolicy.java | 222 +++++++
 .../policy/TestFairOrderingPolicy.java          |  11 +-
 37 files changed, 2764 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index a2c5562..1a86e03 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -166,10 +166,6 @@
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" />
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
-  <Match>
-    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
-    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
-  </Match>
   <!-- Ignore some irrelevant class name warning -->
   <Match>
     <Class name="org.apache.hadoop.yarn.api.records.SerializedException" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 8255a30..a80f317 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -49,13 +50,11 @@ public class AbstractPreemptableResourceCalculator {
 
     @Override
     public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
-      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
-        return -1;
-      }
-      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
-        return 1;
-      }
-      return 0;
+      double assigned1 = getIdealPctOfGuaranteed(tq1);
+      double assigned2 = getIdealPctOfGuaranteed(tq2);
+
+      return PriorityUtilizationQueueOrderingPolicy.compare(assigned1,
+          assigned2, tq1.relativePriority, tq2.relativePriority);
     }
 
     // Calculates idealAssigned / guaranteed
@@ -156,6 +155,7 @@ public class AbstractPreemptableResourceCalculator {
       // way, the most underserved queue(s) are always given resources first.
       Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
           orderedByNeed, tqComparator);
+
       for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
           .hasNext();) {
         TempQueuePerPartition sub = i.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index b48a287..4d8afaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -48,7 +48,8 @@ public abstract class PreemptionCandidatesSelector {
    * @param selectedCandidates already selected candidates from previous policies
    * @param clusterResource total resource
    * @param totalPreemptedResourceAllowed how many resources allowed to be
-   *                                      preempted in this round
+   *                                      preempted in this round. Should be
+   *                                      updated(in-place set) after the call
    * @return merged selected candidates.
    */
   public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 324e845..3bf6994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -193,6 +194,14 @@ public class ProportionalCapacityPreemptionPolicy
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
 
+    // Do we need white queue-priority preemption policy?
+    boolean isQueuePriorityPreemptionEnabled =
+        csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled();
+    if (isQueuePriorityPreemptionEnabled) {
+      candidatesSelectionPolicies.add(
+          new QueuePriorityContainerCandidateSelector(this));
+    }
+
     // Do we need to specially consider reserved containers?
     boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
         CapacitySchedulerConfiguration.
@@ -352,6 +361,8 @@ public class ProportionalCapacityPreemptionPolicy
                 .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
             partitionToLookAt);
       }
+
+      // Update effective priority of queues
     }
 
     this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
@@ -368,13 +379,28 @@ public class ProportionalCapacityPreemptionPolicy
         new HashMap<>();
     for (PreemptionCandidatesSelector selector :
         candidatesSelectionPolicies) {
+      long startTime = 0;
       if (LOG.isDebugEnabled()) {
         LOG.debug(MessageFormat
             .format("Trying to use {0} to select preemption candidates",
                 selector.getClass().getName()));
+        startTime = clock.getTime();
       }
       toPreempt = selector.selectCandidates(toPreempt,
           clusterResources, totalPreemptionAllowed);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(MessageFormat
+            .format("{0} uses {1} millisecond to run",
+                selector.getClass().getName(), clock.getTime() - startTime));
+        int totalSelected = 0;
+        for (Set<RMContainer> set : toPreempt.values()) {
+          totalSelected += set.size();
+        }
+        LOG.debug(MessageFormat
+            .format("So far, total {0} containers selected to be preempted",
+                totalSelected));
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -470,11 +496,22 @@ public class ProportionalCapacityPreemptionPolicy
           reserved, curQueue);
 
       if (curQueue instanceof ParentQueue) {
+        String configuredOrderingPolicy =
+            ((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName();
+
         // Recursively add children
         for (CSQueue c : curQueue.getChildQueues()) {
           TempQueuePerPartition subq = cloneQueues(c, partitionResource,
               partitionToLookAt);
+
+          // If we respect priority
+          if (StringUtils.equals(
+              CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+              configuredOrderingPolicy)) {
+            subq.relativePriority = c.getPriority().getPriority();
+          }
           ret.addChild(subq);
+          subq.parent = ret;
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
new file mode 100644
index 0000000..8e59053
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class QueuePriorityContainerCandidateSelector
+    extends PreemptionCandidatesSelector {
+  private static final Log LOG =
+      LogFactory.getLog(QueuePriorityContainerCandidateSelector.class);
+
+  // Configured timeout before doing reserved container preemption
+  private long minTimeout;
+
+  // Allow move reservation around for better placement?
+  private boolean allowMoveReservation;
+
+  // All the reserved containers of the system which could possible preempt from
+  // queue with lower priorities
+  private List<RMContainer> reservedContainers;
+
+  // From -> To
+  // A digraph to represent if one queue has higher priority than another.
+  // For example, a->b means queue=a has higher priority than queue=b
+  private Table<String, String, Boolean> priorityDigraph =
+      HashBasedTable.create();
+
+  private Resource clusterResource;
+  private Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates;
+  private Resource totalPreemptionAllowed;
+
+  // A cached scheduler node map, will be refreshed each round.
+  private Map<NodeId, TempSchedulerNode> tempSchedulerNodeMap = new HashMap<>();
+
+  // Have we touched (make any changes to the node) for this round
+  // Once a node is touched, we will not try to move reservations to the node
+  private Set<NodeId> touchedNodes;
+
+  // Resource which marked to preempt from other queues.
+  // <Queue, Partition, Resource-marked-to-be-preempted-from-other-queue>
+  private Table<String, String, Resource> toPreemptedFromOtherQueues =
+      HashBasedTable.create();
+
+  private final Comparator<RMContainer>
+      CONTAINER_CREATION_TIME_COMPARATOR = new Comparator<RMContainer>() {
+    @Override
+    public int compare(RMContainer o1, RMContainer o2) {
+      if (preemptionAllowed(o1.getQueueName(), o2.getQueueName())) {
+        return -1;
+      } else if (preemptionAllowed(o2.getQueueName(), o1.getQueueName())) {
+        return 1;
+      }
+
+      // If two queues cannot preempt each other, compare creation time.
+      return Long.compare(o1.getCreationTime(), o2.getCreationTime());
+    }
+  };
+
+  QueuePriorityContainerCandidateSelector(
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    super(preemptionContext);
+
+    // Initialize parameters
+    CapacitySchedulerConfiguration csc =
+        preemptionContext.getScheduler().getConfiguration();
+
+    minTimeout = csc.getPUOrderingPolicyUnderUtilizedPreemptionDelay();
+    allowMoveReservation =
+        csc.getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation();
+  }
+
+  private List<TempQueuePerPartition> getPathToRoot(TempQueuePerPartition tq) {
+    List<TempQueuePerPartition> list = new ArrayList<>();
+    while (tq != null) {
+      list.add(tq);
+      tq = tq.parent;
+    }
+    return list;
+  }
+
+  private void intializePriorityDigraph() {
+    LOG.info("Initializing priority preemption directed graph:");
+
+    // Make sure we iterate all leaf queue combinations
+    for (String q1 : preemptionContext.getLeafQueueNames()) {
+      for (String q2 : preemptionContext.getLeafQueueNames()) {
+        // Make sure we only calculate each combination once instead of all
+        // permutations
+        if (q1.compareTo(q2) < 0) {
+          TempQueuePerPartition tq1 = preemptionContext.getQueueByPartition(q1,
+              RMNodeLabelsManager.NO_LABEL);
+          TempQueuePerPartition tq2 = preemptionContext.getQueueByPartition(q2,
+              RMNodeLabelsManager.NO_LABEL);
+
+          List<TempQueuePerPartition> path1 = getPathToRoot(tq1);
+          List<TempQueuePerPartition> path2 = getPathToRoot(tq2);
+
+          // Get direct ancestor below LCA (Lowest common ancestor)
+          int i = path1.size() - 1;
+          int j = path2.size() - 1;
+          while (path1.get(i).queueName.equals(path2.get(j).queueName)) {
+            i--;
+            j--;
+          }
+
+          // compare priority of path1[i] and path2[j]
+          int p1 = path1.get(i).relativePriority;
+          int p2 = path2.get(j).relativePriority;
+          if (p1 < p2) {
+            priorityDigraph.put(q2, q1, true);
+            if (LOG.isDebugEnabled()) {
+              LOG.info("- Added priority ordering edge: " + q2 + " >> " + q1);
+            }
+          } else if (p2 < p1) {
+            priorityDigraph.put(q1, q2, true);
+            if (LOG.isDebugEnabled()) {
+              LOG.info("- Added priority ordering edge: " + q1 + " >> " + q2);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Do we allow demandingQueue preempt resource from toBePreemptedQueue
+   *
+   * @param demandingQueue demandingQueue
+   * @param toBePreemptedQueue toBePreemptedQueue
+   * @return can/cannot
+   */
+  private boolean preemptionAllowed(String demandingQueue,
+      String toBePreemptedQueue) {
+    return priorityDigraph.contains(demandingQueue,
+        toBePreemptedQueue);
+  }
+
+  /**
+   * Can we preempt enough resource for given:
+   *
+   * @param requiredResource askedResource
+   * @param demandingQueue demandingQueue
+   * @param schedulerNode node
+   * @param lookingForNewReservationPlacement Are we trying to look for move
+   *        reservation to the node
+   * @param newlySelectedContainers newly selected containers, will be set when
+   *        we can preempt enough resources from the node.
+   *
+   * @return can/cannot
+   */
+  private boolean canPreemptEnoughResourceForAsked(Resource requiredResource,
+      String demandingQueue, FiCaSchedulerNode schedulerNode,
+      boolean lookingForNewReservationPlacement,
+      List<RMContainer> newlySelectedContainers) {
+    // Do not check touched nodes again.
+    if (touchedNodes.contains(schedulerNode.getNodeID())) {
+      return false;
+    }
+
+    TempSchedulerNode node = tempSchedulerNodeMap.get(schedulerNode.getNodeID());
+    if (null == node) {
+      node = TempSchedulerNode.fromSchedulerNode(schedulerNode);
+      tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node);
+    }
+
+    if (null != schedulerNode.getReservedContainer()
+        && lookingForNewReservationPlacement) {
+      // Node reserved by the others, skip this node
+      // We will not try to move the reservation to node which reserved already.
+      return false;
+    }
+
+    // Need to preemption = asked - (node.total - node.allocated)
+    Resource lacking = Resources.subtract(requiredResource, Resources
+        .subtract(node.getTotalResource(), node.getAllocatedResource()));
+
+    // On each host, simply check if we could preempt containers from
+    // lower-prioritized queues or not
+    List<RMContainer> runningContainers = node.getRunningContainers();
+    Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR);
+
+    // First of all, consider already selected containers
+    for (RMContainer runningContainer : runningContainers) {
+      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
+          runningContainer, selectedCandidates)) {
+        Resources.subtractFrom(lacking,
+            runningContainer.getAllocatedResource());
+      }
+    }
+
+    // If we already can allocate the reserved container after preemption,
+    // skip following steps
+    if (Resources.fitsIn(rc, clusterResource, lacking,
+        Resources.none())) {
+      return true;
+    }
+
+    Resource allowed = Resources.clone(totalPreemptionAllowed);
+    Resource selected = Resources.createResource(0);
+
+    for (RMContainer runningContainer : runningContainers) {
+      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
+          runningContainer, selectedCandidates)) {
+        // ignore selected containers
+        continue;
+      }
+
+      // Only preempt resource from queue with lower priority
+      if (!preemptionAllowed(demandingQueue,
+          runningContainer.getQueueName())) {
+        continue;
+      }
+
+      // Don't preempt AM container
+      if (runningContainer.isAMContainer()) {
+        continue;
+      }
+
+      // Not allow to preempt more than limit
+      if (Resources.greaterThanOrEqual(rc, clusterResource, allowed,
+          runningContainer.getAllocatedResource())) {
+        Resources.subtractFrom(allowed,
+            runningContainer.getAllocatedResource());
+        Resources.subtractFrom(lacking,
+            runningContainer.getAllocatedResource());
+        Resources.addTo(selected, runningContainer.getAllocatedResource());
+
+        if (null != newlySelectedContainers) {
+          newlySelectedContainers.add(runningContainer);
+        }
+      }
+
+      // Lacking <= 0 means we can allocate the reserved container
+      if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean preChecksForMovingReservedContainerToNode(
+      RMContainer reservedContainer, FiCaSchedulerNode newNode) {
+    // Don't do this if it has hard-locality preferences
+    if (reservedContainer.hasIncreaseReservation()) {
+      // This means a container update request (like increase / promote)
+      return false;
+    }
+
+    // For normal requests
+    FiCaSchedulerApp app =
+        preemptionContext.getScheduler().getApplicationAttempt(
+            reservedContainer.getApplicationAttemptId());
+    ResourceRequest offswithRequest = app.getAppSchedulingInfo().getResourceRequest(
+        reservedContainer.getReservedSchedulerKey(), ResourceRequest.ANY);
+    if (!offswithRequest.getRelaxLocality()) {
+      // This is a hard locality request
+      return false;
+    }
+
+    // Check if newNode's partition matches requested partition
+    if (!StringUtils.equals(reservedContainer.getNodeLabelExpression(),
+        newNode.getPartition())) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private void tryToMakeBetterReservationPlacement(
+      RMContainer reservedContainer,
+      List<FiCaSchedulerNode> allSchedulerNodes) {
+    for (FiCaSchedulerNode targetNode : allSchedulerNodes) {
+      // Precheck if we can move the rmContainer to the new targetNode
+      if (!preChecksForMovingReservedContainerToNode(reservedContainer,
+          targetNode)) {
+        continue;
+      }
+
+      if (canPreemptEnoughResourceForAsked(
+          reservedContainer.getReservedResource(),
+          reservedContainer.getQueueName(), targetNode, true, null)) {
+        NodeId fromNode = reservedContainer.getNodeId();
+
+        // We can place container to this targetNode, so just go ahead and notify
+        // scheduler
+        if (preemptionContext.getScheduler().moveReservedContainer(
+            reservedContainer, targetNode)) {
+          LOG.info("Successfully moved reserved container=" + reservedContainer
+              .getContainerId() + " from targetNode=" + fromNode
+              + " to targetNode=" + targetNode.getNodeID());
+          touchedNodes.add(targetNode.getNodeID());
+        }
+      }
+    }
+  }
+
+  /**
+   * Do we allow the demanding queue preempt resource from other queues?
+   * A satisfied queue is not allowed to preempt resource from other queues.
+   * @param demandingQueue
+   * @return allowed/not
+   */
+  private boolean isQueueSatisfied(String demandingQueue,
+      String partition) {
+    TempQueuePerPartition tq = preemptionContext.getQueueByPartition(
+        demandingQueue, partition);
+    if (null == tq) {
+      return false;
+    }
+
+    Resource guaranteed = tq.getGuaranteed();
+    Resource usedDeductReservd = Resources.subtract(tq.getUsed(),
+        tq.getReserved());
+    Resource markedToPreemptFromOtherQueue = toPreemptedFromOtherQueues.get(
+        demandingQueue, partition);
+    if (null == markedToPreemptFromOtherQueue) {
+      markedToPreemptFromOtherQueue = Resources.none();
+    }
+
+    // return Used - reserved + to-preempt-from-other-queue >= guaranteed
+    boolean flag = Resources.greaterThanOrEqual(rc, clusterResource,
+        Resources.add(usedDeductReservd, markedToPreemptFromOtherQueue),
+        guaranteed);
+    return flag;
+  }
+
+  private void incToPreempt(String queue, String partition,
+      Resource allocated) {
+    Resource total = toPreemptedFromOtherQueues.get(queue, partition);
+    if (null == total) {
+      total = Resources.createResource(0);
+      toPreemptedFromOtherQueues.put(queue, partition, total);
+    }
+
+    Resources.addTo(total, allocated);
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource,
+      Resource totalPreemptedResourceAllowed) {
+    // Initialize digraph from queues
+    // TODO (wangda): only do this when queue refreshed.
+    priorityDigraph.clear();
+    intializePriorityDigraph();
+
+    // When all queues are set to same priority, or priority is not respected,
+    // direct return.
+    if (priorityDigraph.isEmpty()) {
+      return selectedCandidates;
+    }
+
+    // Save parameters to be shared by other methods
+    this.selectedCandidates = selectedCandidates;
+    this.clusterResource = clusterResource;
+    this.totalPreemptionAllowed = totalPreemptedResourceAllowed;
+
+    toPreemptedFromOtherQueues.clear();
+
+    reservedContainers = new ArrayList<>();
+
+    // Clear temp-scheduler-node-map every time when doing selection of
+    // containers.
+    tempSchedulerNodeMap.clear();
+    touchedNodes = new HashSet<>();
+
+    // Add all reserved containers for analysis
+    List<FiCaSchedulerNode> allSchedulerNodes =
+        preemptionContext.getScheduler().getAllNodes();
+    for (FiCaSchedulerNode node : allSchedulerNodes) {
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (null != reservedContainer) {
+        // Add to reservedContainers list if the queue that the reserved
+        // container belongs to has high priority than at least one queue
+        if (priorityDigraph.containsRow(
+            reservedContainer.getQueueName())) {
+          reservedContainers.add(reservedContainer);
+        }
+      }
+    }
+
+    // Sort reserved container by creation time
+    Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR);
+
+    long currentTime = System.currentTimeMillis();
+
+    // From the begining of the list
+    for (RMContainer reservedContainer : reservedContainers) {
+      // Only try to preempt reserved container after reserved container created
+      // and cannot be allocated after minTimeout
+      if (currentTime - reservedContainer.getCreationTime() < minTimeout) {
+        continue;
+      }
+
+      FiCaSchedulerNode node = preemptionContext.getScheduler().getNode(
+          reservedContainer.getReservedNode());
+      if (null == node) {
+        // Something is wrong, ignore
+        continue;
+      }
+
+      List<RMContainer> newlySelectedToBePreemptContainers = new ArrayList<>();
+
+      // Check if we can preempt for this queue
+      // We will skip if the demanding queue is already satisfied.
+      String demandingQueueName = reservedContainer.getQueueName();
+      boolean demandingQueueSatisfied = isQueueSatisfied(demandingQueueName,
+          node.getPartition());
+
+      // We will continue check if it is possible to preempt reserved container
+      // from the node.
+      boolean canPreempt = false;
+      if (!demandingQueueSatisfied) {
+        canPreempt = canPreemptEnoughResourceForAsked(
+            reservedContainer.getReservedResource(), demandingQueueName, node,
+            false, newlySelectedToBePreemptContainers);
+      }
+
+      // Add selected container if we can allocate reserved container by
+      // preemption others
+      if (canPreempt) {
+        touchedNodes.add(node.getNodeID());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trying to preempt following containers to make reserved "
+              + "container=" + reservedContainer.getContainerId() + " on node="
+              + node.getNodeID() + " can be allocated:");
+        }
+
+        // Update to-be-preempt
+        incToPreempt(demandingQueueName, node.getPartition(),
+            reservedContainer.getReservedResource());
+
+        for (RMContainer c : newlySelectedToBePreemptContainers) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" --container=" + c.getContainerId() + " resource=" + c
+                .getReservedResource());
+          }
+
+          Set<RMContainer> containers = selectedCandidates.get(
+              c.getApplicationAttemptId());
+          if (null == containers) {
+            containers = new HashSet<>();
+            selectedCandidates.put(c.getApplicationAttemptId(), containers);
+          }
+          containers.add(c);
+
+          // Update totalPreemptionResourceAllowed
+          Resources.subtractFrom(totalPreemptedResourceAllowed,
+              c.getAllocatedResource());
+        }
+      } else if (!demandingQueueSatisfied) {
+        // We failed to get enough resource to allocate the container
+        // This typically happens when the reserved node is proper, will
+        // try to see if we can reserve the container on a better host.
+        // Only do this if the demanding queue is not satisfied.
+        //
+        // TODO (wangda): do more tests before making it usable
+        //
+        if (allowMoveReservation) {
+          tryToMakeBetterReservationPlacement(reservedContainer,
+              allSchedulerNodes);
+        }
+      }
+    }
+
+    return selectedCandidates;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 9783457..7eab015 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -53,6 +53,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
 
   protected Resource pendingDeductReserved;
 
+  // Relative priority of this queue to its parent
+  // If parent queue's ordering policy doesn't respect priority,
+  // this will be always 0
+  int relativePriority = 0;
+  TempQueuePerPartition parent = null;
+
   TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@@ -114,8 +120,15 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
         Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
-    // remain = avail - min(avail, (max - assigned), (current + pending -
-    // assigned))
+    // accepted = min{avail,
+    //               max - assigned,
+    //               current + pending - assigned,
+    //               # Make sure a queue will not get more than max of its
+    //               # used/guaranteed, this is to make sure preemption won't
+    //               # happen if all active queues are beyond their guaranteed
+    //               # This is for leaf queue only.
+    //               max(guaranteed, used) - assigned}
+    // remain = avail - accepted
     Resource accepted = Resources.min(rc, clusterResource,
         absMaxCapIdealAssignedDelta,
         Resources.min(rc, clusterResource, avail, Resources
@@ -137,6 +150,21 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
             .subtract(Resources.add(getUsed(),
                 (considersReservedResource ? pending : pendingDeductReserved)),
                 idealAssigned)));
+
+    // For leaf queue: accept = min(accept, max(guaranteed, used) - assigned)
+    // Why only for leaf queue?
+    // Because for a satisfied parent queue, it could have some under-utilized
+    // leaf queues. Such under-utilized leaf queue could preemption resources
+    // from over-utilized leaf queue located at other hierarchies.
+    if (null == children || children.isEmpty()) {
+      Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
+          Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
+          idealAssigned);
+      maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
+          maxOfGuranteedAndUsedDeductAssigned, Resources.none());
+      accepted = Resources.min(rc, clusterResource, accepted,
+          maxOfGuranteedAndUsedDeductAssigned);
+    }
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
new file mode 100644
index 0000000..320f262
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.List;
+
+/**
+ * This class will save necessary information which copied from
+ * FiCaSchedulerNode. This is added majorly for performance consideration, this
+ * can be cached to avoid hitting scheduler again and again. In addition,
+ * we can add some preemption-required fields to the class.
+ */
+public class TempSchedulerNode {
+  private List<RMContainer> runningContainers;
+  private RMContainer reservedContainer;
+  private Resource totalResource;
+
+  // excluded reserved resource
+  private Resource allocatedResource;
+
+  // total - allocated
+  private Resource availableResource;
+
+  // just a shortcut of reservedContainer.getResource.
+  private Resource reservedResource;
+
+  private NodeId nodeId;
+
+  public static TempSchedulerNode fromSchedulerNode(
+      FiCaSchedulerNode schedulerNode) {
+    TempSchedulerNode n = new TempSchedulerNode();
+    n.totalResource = Resources.clone(schedulerNode.getTotalResource());
+    n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource());
+    n.runningContainers = schedulerNode.getCopiedListOfRunningContainers();
+    n.reservedContainer = schedulerNode.getReservedContainer();
+    if (n.reservedContainer != null) {
+      n.reservedResource = n.reservedContainer.getReservedResource();
+    } else {
+      n.reservedResource = Resources.none();
+    }
+    n.availableResource = Resources.subtract(n.totalResource,
+        n.allocatedResource);
+    n.nodeId = schedulerNode.getNodeID();
+    return n;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  public List<RMContainer> getRunningContainers() {
+    return runningContainers;
+  }
+
+  public void setRunningContainers(List<RMContainer> runningContainers) {
+    this.runningContainers = runningContainers;
+  }
+
+  public RMContainer getReservedContainer() {
+    return reservedContainer;
+  }
+
+  public void setReservedContainer(RMContainer reservedContainer) {
+    this.reservedContainer = reservedContainer;
+  }
+
+  public Resource getTotalResource() {
+    return totalResource;
+  }
+
+  public void setTotalResource(Resource totalResource) {
+    this.totalResource = totalResource;
+  }
+
+  public Resource getAllocatedResource() {
+    return allocatedResource;
+  }
+
+  public void setAllocatedResource(Resource allocatedResource) {
+    this.allocatedResource = allocatedResource;
+  }
+
+  public Resource getAvailableResource() {
+    return availableResource;
+  }
+
+  public void setAvailableResource(Resource availableResource) {
+    this.availableResource = availableResource;
+  }
+
+  public Resource getReservedResource() {
+    return reservedResource;
+  }
+
+  public void setReservedResource(Resource reservedResource) {
+    this.reservedResource = reservedResource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 0afd765..b647224 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -548,7 +548,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       container.reservedResource = e.getReservedResource();
       container.reservedNode = e.getReservedNode();
       container.reservedSchedulerKey = e.getReservedSchedulerKey();
-      
+
+      Container c = container.getContainer();
+      if (c != null) {
+        c.setNodeId(container.reservedNode);
+      }
+
       if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
           .contains(container.getState())) {
         // When container's state != NEW/RESERVED, it is an increase reservation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 15fd830..59ca81b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -407,7 +407,7 @@ public abstract class SchedulerNode {
    * Set the reserved container in the node.
    * @param reservedContainer Reserved container in the node.
    */
-  protected synchronized void
+  public synchronized void
   setReservedContainer(RMContainer reservedContainer) {
     this.reservedContainer = reservedContainer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index cefa1e2..e9ef319 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -109,6 +109,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   protected ReentrantReadWriteLock.ReadLock readLock;
   protected ReentrantReadWriteLock.WriteLock writeLock;
 
+  volatile Priority priority = Priority.newInstance(0);
+
   public AbstractCSQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     this.labelManager = cs.getRMContext().getNodeLabelManager();
@@ -336,6 +338,9 @@ public abstract class AbstractCSQueue implements CSQueue {
           csContext.getConfiguration().getReservationContinueLook();
 
       this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
+
+      this.priority = csContext.getConfiguration().getQueuePriority(
+          getQueuePath());
     } finally {
       writeLock.unlock();
     }
@@ -934,4 +939,9 @@ public abstract class AbstractCSQueue implements CSQueue {
       this.writeLock.unlock();
     }
   }
+
+  @Override
+  public Priority getPriority() {
+    return this.priority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index e30ec39..2e3ced5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -372,4 +373,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    */
   public void validateSubmitApplication(ApplicationId applicationId,
       String userName, String queue) throws AccessControlException;
+
+  /**
+   * Get priority of queue
+   * @return queue priority
+   */
+  Priority getPriority();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 724458b..e76c54e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -269,16 +269,6 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public Comparator<CSQueue> getNonPartitionedQueueComparator() {
-    return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
-  }
-
-  @Override
-  public PartitionedQueueComparator getPartitionedQueueComparator() {
-    return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
-  }
-
-  @Override
   public int getNumClusterNodes() {
     return nodeTracker.nodeCount();
   }
@@ -2505,4 +2495,69 @@ public class CapacityScheduler extends
   public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
     return this.queueManager;
   }
+
+  /**
+   * Try to move a reserved container to a targetNode.
+   * If the targetNode is reserved by another application (other than this one).
+   * The previous reservation will be cancelled.
+   *
+   * @param toBeMovedContainer reserved container will be moved
+   * @param targetNode targetNode
+   * @return true if move succeeded. Return false if the targetNode is reserved by
+   *         a different container or move failed because of any other reasons.
+   */
+  public boolean moveReservedContainer(RMContainer toBeMovedContainer,
+      FiCaSchedulerNode targetNode) {
+    try {
+      writeLock.lock();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to move container=" + toBeMovedContainer + " to node="
+            + targetNode.getNodeID());
+      }
+
+      FiCaSchedulerNode sourceNode = getNode(toBeMovedContainer.getNodeId());
+      if (null == sourceNode) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Failed to move reservation, cannot find source node="
+              + toBeMovedContainer.getNodeId());
+        }
+        return false;
+      }
+
+      // Target node updated?
+      if (getNode(targetNode.getNodeID()) != targetNode) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Failed to move reservation, node updated or removed, moving "
+                  + "cancelled.");
+        }
+        return false;
+      }
+
+      // Target node's reservation status changed?
+      if (targetNode.getReservedContainer() != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Target node's reservation status changed, moving cancelled.");
+        }
+        return false;
+      }
+
+      FiCaSchedulerApp app = getApplicationAttempt(
+          toBeMovedContainer.getApplicationAttemptId());
+      if (null == app) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot find to-be-moved container's application="
+              + toBeMovedContainer.getApplicationAttemptId());
+        }
+        return false;
+      }
+
+      // finally, move the reserved container
+      return app.moveReservation(toBeMovedContainer, sourceNode, targetNode);
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index eb148d2..43ec390 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,19 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.StringTokenizer;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -44,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationACL;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -51,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingP
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -59,7 +51,17 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
 
@@ -127,14 +129,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final String MAXIMUM_ALLOCATION_VCORES =
       "maximum-allocation-vcores";
-  
+
+  /**
+   * Ordering policy of queues
+   */
   public static final String ORDERING_POLICY = "ordering-policy";
-  
-  public static final String FIFO_ORDERING_POLICY = "fifo";
 
-  public static final String FAIR_ORDERING_POLICY = "fair";
+  /*
+   * Ordering policy inside a leaf queue to sort apps
+   */
+  public static final String FIFO_APP_ORDERING_POLICY = "fifo";
+
+  public static final String FAIR_APP_ORDERING_POLICY = "fair";
 
-  public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY;
+  public static final String DEFAULT_APP_ORDERING_POLICY =
+      FIFO_APP_ORDERING_POLICY;
   
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@@ -298,6 +307,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     String queueName = PREFIX + queue + DOT;
     return queueName;
   }
+
+  static String getQueueOrderingPolicyPrefix(String queue) {
+    String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
+    return queueName;
+  }
   
   private String getNodeLabelPrefix(String queue, String label) {
     if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
@@ -400,20 +414,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         DEFAULT_USER_LIMIT);
     return userLimit;
   }
-  
+
+  // TODO (wangda): We need to better distinguish app ordering policy and queue
+  // ordering policy's classname / configuration options, etc. And dedup code
+  // if possible.
   @SuppressWarnings("unchecked")
-  public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy(
+  public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
       String queue) {
   
-    String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, 
-      DEFAULT_ORDERING_POLICY);
+    String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
+        DEFAULT_APP_ORDERING_POLICY);
     
     OrderingPolicy<S> orderingPolicy;
     
-    if (policyType.trim().equals(FIFO_ORDERING_POLICY)) {
+    if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) {
        policyType = FifoOrderingPolicy.class.getName();
     }
-    if (policyType.trim().equals(FAIR_ORDERING_POLICY)) {
+    if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
        policyType = FairOrderingPolicy.class.getName();
     }
     try {
@@ -734,6 +751,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     return Resources.createResource(maximumMemory, maximumCores);
   }
 
+  @Private
+  public Priority getQueuePriority(String queue) {
+    String queuePolicyPrefix = getQueuePrefix(queue);
+    Priority pri = Priority.newInstance(
+        getInt(queuePolicyPrefix + "priority", 0));
+    return pri;
+  }
+
+  @Private
+  public void setQueuePriority(String queue, int priority) {
+    String queuePolicyPrefix = getQueuePrefix(queue);
+    setInt(queuePolicyPrefix + "priority", priority);
+  }
+
   /**
    * Get the per queue setting for the maximum limit to allocate to
    * each container request.
@@ -1204,4 +1235,161 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
     return maxApplicationsPerQueue;
   }
+
+  /**
+   * Ordering policy inside a parent queue to sort queues
+   */
+
+  /**
+   * Less relative usage queue can get next resource, this is default
+   */
+  public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization";
+
+  /**
+   * Combination of relative usage and priority
+   */
+  public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY =
+      "priority-utilization";
+
+  public static final String DEFAULT_QUEUE_ORDERING_POLICY =
+      QUEUE_UTILIZATION_ORDERING_POLICY;
+
+
+  @Private
+  public void setQueueOrderingPolicy(String queue, String policy) {
+    set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
+  }
+
+  @Private
+  public QueueOrderingPolicy getQueueOrderingPolicy(String queue,
+      String parentPolicy) {
+    String defaultPolicy = parentPolicy;
+    if (null == defaultPolicy) {
+      defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY;
+    }
+
+    String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
+        defaultPolicy);
+
+    QueueOrderingPolicy qop;
+    if (policyType.trim().equals(QUEUE_UTILIZATION_ORDERING_POLICY)) {
+      // Doesn't respect priority
+      qop = new PriorityUtilizationQueueOrderingPolicy(false);
+    } else if (policyType.trim().equals(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) {
+      qop = new PriorityUtilizationQueueOrderingPolicy(true);
+    } else {
+      String message =
+          "Unable to construct queue ordering policy=" + policyType + " queue="
+              + queue;
+      throw new YarnRuntimeException(message);
+    }
+
+    return qop;
+  }
+
+  /*
+   * Get global configuration for ordering policies
+   */
+  private String getOrderingPolicyGlobalConfigKey(String orderPolicyName,
+      String configKey) {
+    return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey;
+  }
+
+  /**
+   * Global configurations of queue-priority-utilization ordering policy
+   */
+  private static final String UNDER_UTILIZED_PREEMPTION_ENABLED =
+      "underutilized-preemption.enabled";
+
+  /**
+   * Do we allow under-utilized queue with higher priority to preempt queue
+   * with lower priority *even if queue with lower priority is not satisfied*.
+   *
+   * For example, two queues, a and b
+   * a.priority = 1, (a.used-capacity - a.reserved-capacity) = 40%
+   * b.priority = 0, b.used-capacity = 30%
+   *
+   * Set this configuration to true to allow queue-a to preempt container from
+   * queue-b.
+   *
+   * (The reason why deduct reserved-capacity from used-capacity for queue with
+   * higher priority is: the reserved-capacity is just scheduler's internal
+   * implementation to allocate large containers, it is not possible for
+   * application to use such reserved-capacity. It is possible that a queue with
+   * large container requests have a large number of containers but cannot
+   * allocate from any of them. But scheduler will make sure a satisfied queue
+   * will not preempt resource from any other queues. A queue is considered to
+   * be satisfied when queue's used-capacity - reserved-capacity ≥
+   * guaranteed-capacity.)
+   *
+   * @return allowed or not
+   */
+  public boolean getPUOrderingPolicyUnderUtilizedPreemptionEnabled() {
+    return getBoolean(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_ENABLED), false);
+  }
+
+  @VisibleForTesting
+  public void setPUOrderingPolicyUnderUtilizedPreemptionEnabled(
+      boolean enabled) {
+    setBoolean(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_ENABLED), enabled);
+  }
+
+  private static final String UNDER_UTILIZED_PREEMPTION_DELAY =
+      "underutilized-preemption.reserved-container-delay-ms";
+
+  /**
+   * When a reserved container of an underutilized queue is created. Preemption
+   * will kick in after specified delay (in ms).
+   *
+   * The total time to preempt resources for a reserved container from higher
+   * priority queue will be: reserved-container-delay-ms +
+   * {@link CapacitySchedulerConfiguration#PREEMPTION_WAIT_TIME_BEFORE_KILL}.
+   *
+   * This parameter is added to make preemption from lower priority queue which
+   * is underutilized to be more careful. This parameter takes effect when
+   * underutilized-preemption.enabled set to true.
+   *
+   * @return delay
+   */
+  public long getPUOrderingPolicyUnderUtilizedPreemptionDelay() {
+    return getLong(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_DELAY), 60000L);
+  }
+
+  @VisibleForTesting
+  public void setPUOrderingPolicyUnderUtilizedPreemptionDelay(
+      long timeout) {
+    setLong(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_DELAY), timeout);
+  }
+
+  private static final String UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION =
+      "underutilized-preemption.allow-move-reservation";
+
+  /**
+   * When doing preemption from under-satisfied queues for priority queue.
+   * Do we allow move reserved container from one host to another?
+   *
+   * @return allow or not
+   */
+  public boolean getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation() {
+    return getBoolean(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), false);
+  }
+
+  @VisibleForTesting
+  public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(
+      boolean allowMoveReservation) {
+    setBoolean(getOrderingPolicyGlobalConfigKey(
+        QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
+        UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 504acb9..9aeaec6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -61,10 +61,6 @@ public interface CapacitySchedulerContext {
   Configuration getConf();
 
   ResourceCalculator getResourceCalculator();
-
-  Comparator<CSQueue> getNonPartitionedQueueComparator();
-  
-  PartitionedQueueComparator getPartitionedQueueComparator();
   
   FiCaSchedulerNode getNode(NodeId nodeId);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index f204c74..8cae6c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -75,9 +75,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     }
   };
 
-  static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
-      new PartitionedQueueComparator();
-
   static class QueueHook {
     public CSQueue hook(CSQueue queue) {
       return queue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 134edb7..6b0a9066 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -194,7 +194,7 @@ public class LeafQueue extends AbstractCSQueue {
       CapacitySchedulerConfiguration conf = csContext.getConfiguration();
 
       setOrderingPolicy(
-          conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
+          conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
 
       userLimit = conf.getUserLimit(getQueuePath());
       userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@@ -296,7 +296,7 @@ public class LeafQueue extends AbstractCSQueue {
               .toString() + "\n" + "reservationsContinueLooking = "
               + reservationsContinueLooking + "\n" + "preemptionDisabled = "
               + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
-              + defaultAppPriorityPerQueue);
+              + defaultAppPriorityPerQueue + "\npriority = " + priority);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index ec2cccb..75ab610 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -18,18 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
@@ -73,29 +62,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 @Private
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
 
   private static final Log LOG = LogFactory.getLog(ParentQueue.class);
 
-  protected final Set<CSQueue> childQueues;  
+  protected final List<CSQueue> childQueues;
   private final boolean rootQueue;
-  private final Comparator<CSQueue> nonPartitionedQueueComparator;
-  private final PartitionedQueueComparator partitionQueueComparator;
   private volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
+  private QueueOrderingPolicy queueOrderingPolicy;
+
   public ParentQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
-    this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
-    this.partitionQueueComparator = new PartitionedQueueComparator();
-
     this.rootQueue = (parent == null);
 
     float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
@@ -107,7 +101,7 @@ public class ParentQueue extends AbstractCSQueue {
           ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
     }
 
-    this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
+    this.childQueues = new ArrayList<>();
 
     setupQueueConfigs(cs.getClusterResource());
 
@@ -116,7 +110,14 @@ public class ParentQueue extends AbstractCSQueue {
         ", fullname=" + getQueuePath());
   }
 
-  void setupQueueConfigs(Resource clusterResource)
+  // returns what is configured queue ordering policy
+  private String getQueueOrderingPolicyConfigName() {
+    return queueOrderingPolicy == null ?
+        null :
+        queueOrderingPolicy.getConfigName();
+  }
+
+  protected void setupQueueConfigs(Resource clusterResource)
       throws IOException {
     try {
       writeLock.lock();
@@ -134,13 +135,22 @@ public class ParentQueue extends AbstractCSQueue {
         }
       }
 
+      // Initialize queue ordering policy
+      queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy(
+          getQueuePath(), parent == null ?
+              null :
+              ((ParentQueue) parent).getQueueOrderingPolicyConfigName());
+      queueOrderingPolicy.setQueues(childQueues);
+
       LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
           + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
           + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
           + ", absoluteMaxCapacity=" + this.queueCapacities
           .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
           + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
-          + ", reservationsContinueLooking=" + reservationsContinueLooking);
+          + ", reservationsContinueLooking=" + reservationsContinueLooking
+          + ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
+          + ", priority=" + priority);
     } finally {
       writeLock.unlock();
     }
@@ -294,8 +304,8 @@ public class ParentQueue extends AbstractCSQueue {
 
       // Re-configure existing child queues and add new ones
       // The CS has already checked to ensure all existing child queues are present!
-      Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
-      Map<String, CSQueue> newChildQueues = getQueues(
+      Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues);
+      Map<String, CSQueue> newChildQueues = getQueuesMap(
           newlyParsedParentQueue.childQueues);
       for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
         String newChildQueueName = e.getKey();
@@ -338,7 +348,7 @@ public class ParentQueue extends AbstractCSQueue {
     }
   }
 
-  Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
+  private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) {
     Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
     for (CSQueue queue : queues) {
       queuesMap.put(queue.getQueueName(), queue);
@@ -680,13 +690,7 @@ public class ParentQueue extends AbstractCSQueue {
 
   private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
       String partition) {
-    // Previously we keep a sorted list for default partition, it is not good
-    // when multi-threading scheduler is enabled, so to make a simpler code
-    // now re-sort queue every time irrespective to node partition.
-    partitionQueueComparator.setPartitionToLookAt(partition);
-    List<CSQueue> childrenList = new ArrayList<>(childQueues);
-    Collections.sort(childrenList, partitionQueueComparator);
-    return childrenList.iterator();
+    return queueOrderingPolicy.getAssignmentIterator(partition);
   }
 
   private CSAssignment assignContainersToChildQueues(Resource cluster,
@@ -1083,4 +1087,8 @@ public class ParentQueue extends AbstractCSQueue {
       this.writeLock.unlock();
     }
   }
+
+  public QueueOrderingPolicy getQueueOrderingPolicy() {
+    return queueOrderingPolicy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1309accd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
deleted file mode 100644
index 477c615..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import java.util.Comparator;
-
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-
-public class PartitionedQueueComparator implements Comparator<CSQueue> {
-  private String partitionToLookAt = null;
-  
-  public void setPartitionToLookAt(String partitionToLookAt) {
-    this.partitionToLookAt = partitionToLookAt;
-  }
-  
-
-  @Override
-  public int compare(CSQueue q1, CSQueue q2) {
-    /*
-     * 1. Check accessible to given partition, if one queue accessible and
-     * the other not, accessible queue goes first.
-     */
-    boolean q1Accessible =
-        q1.getAccessibleNodeLabels().contains(partitionToLookAt)
-            || q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
-    boolean q2Accessible =
-        q2.getAccessibleNodeLabels().contains(partitionToLookAt)
-            || q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
-    if (q1Accessible && !q2Accessible) {
-      return -1;
-    } else if (!q1Accessible && q2Accessible) {
-      return 1;
-    }
-
-    /*
-     * 
-     * 2. When two queue has same accessibility, check who will go first:
-     * Now we simply compare their used resource on the partition to lookAt
-     */
-    float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt);
-    float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt);
-    if (Math.abs(used1 - used2) < 1e-6) {
-      // When used capacity is same, compare their guaranteed-capacity
-      float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt);
-      float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt);
-      
-      // when cap1 == cap2, we will compare queue's name
-      if (Math.abs(cap1 - cap2) < 1e-6) {
-        return q1.getQueueName().compareTo(q2.getQueueName());
-      }
-      return Float.compare(cap2, cap1);
-    }
-    
-    return Float.compare(used1, used2);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message