Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D413118AB8 for ; Sat, 25 Apr 2015 00:03:42 +0000 (UTC) Received: (qmail 17290 invoked by uid 500); 25 Apr 2015 00:03:42 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 16888 invoked by uid 500); 25 Apr 2015 00:03:42 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 16872 invoked by uid 99); 25 Apr 2015 00:03:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 25 Apr 2015 00:03:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44E12E0F7D; Sat, 25 Apr 2015 00:03:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Sat, 25 Apr 2015 00:03:43 -0000 Message-Id: <57dd8a677fe144b094e7f90838c44ded@git.apache.org> In-Reply-To: <3b107a1152a347b39ee28bffb1061a24@git.apache.org> References: <3b107a1152a347b39ee28bffb1061a24@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d497f6ea Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d497f6ea Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d497f6ea Branch: refs/heads/trunk Commit: d497f6ea2be559aa31ed76f37ae949dbfabe2a51 Parents: dcc5455 Author: Jian He Authored: Fri Apr 24 17:03:13 2015 -0700 Committer: Jian He Committed: Fri Apr 24 17:03:13 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../ProportionalCapacityPreemptionPolicy.java | 585 +++++---- .../rmcontainer/RMContainerImpl.java | 28 +- .../scheduler/capacity/CapacityScheduler.java | 2 +- .../scheduler/capacity/LeafQueue.java | 70 +- .../scheduler/common/AssignmentInformation.java | 31 +- ...estProportionalCapacityPreemptionPolicy.java | 94 +- ...pacityPreemptionPolicyForNodePartitions.java | 1211 ++++++++++++++++++ .../scheduler/capacity/TestChildQueueOrder.java | 2 +- .../scheduler/capacity/TestLeafQueue.java | 4 +- .../TestNodeLabelContainerAllocation.java | 16 + .../scheduler/capacity/TestParentQueue.java | 2 +- 12 files changed, 1750 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 44b87e5..a830771 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -102,6 +102,9 @@ Release 2.8.0 - UNRELEASED YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda) + YARN-2498. Respect labels in preemption policy of capacity scheduler for + inter-queue preemption. (Wangda Tan via jianhe) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 2ab4197..1f47b5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,11 +27,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeSet; -import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; /** * This class implement a {@link SchedulingEditPolicy} that is designed to be @@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; - private Map> labels; + private Map> queueToPartitions = + new HashMap<>(); + private RMNodeLabelsManager nlm; public ProportionalCapacityPreemptionPolicy() { clock = new SystemClock(); @@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); - labels = null; + nlm = scheduler.getRMContext().getNodeLabelManager(); } @VisibleForTesting @@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public void editSchedule() { CSQueue root = scheduler.getRootQueue(); Resource clusterResources = Resources.clone(scheduler.getClusterResource()); - clusterResources = getNonLabeledResources(clusterResources); - setNodeLabels(scheduler.getRMContext().getNodeLabelManager() - .getNodeLabels()); containerBasedPreemptOrKill(root, clusterResources); } - - /** - * Setting Node Labels - * - * @param nodelabels - */ - public void setNodeLabels(Map> nodelabels) { - labels = nodelabels; - } - - /** - * This method returns all non labeled resources. - * - * @param clusterResources - * @return Resources - */ - private Resource getNonLabeledResources(Resource clusterResources) { - RMContext rmcontext = scheduler.getRMContext(); - RMNodeLabelsManager lm = rmcontext.getNodeLabelManager(); - Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - clusterResources); - return res == null ? clusterResources : res; - } /** * This method selects and tracks containers to be preempted. If a container @@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic */ private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { + // All partitions to look at + Set allPartitions = new HashSet<>(); + allPartitions.addAll(scheduler.getRMContext() + .getNodeLabelManager().getClusterNodeLabelNames()); + allPartitions.add(RMNodeLabelsManager.NO_LABEL); // extract a summary of the queues from scheduler - TempQueue tRoot; synchronized (scheduler) { - tRoot = cloneQueues(root, clusterResources); + queueToPartitions.clear(); + + for (String partitionToLookAt : allPartitions) { + cloneQueues(root, + nlm.getResourceByLabel(partitionToLookAt, clusterResources), + partitionToLookAt); + } } - // compute the ideal distribution of resources among queues - // updates cloned queues state accordingly - tRoot.idealAssigned = tRoot.guaranteed; + // compute total preemption allowed Resource totalPreemptionAllowed = Resources.multiply(clusterResources, percentageClusterPreemptionAllowed); - List queues = - recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + + Set leafQueueNames = null; + for (String partition : allPartitions) { + TempQueuePerPartition tRoot = + getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); + // compute the ideal distribution of resources among queues + // updates cloned queues state accordingly + tRoot.idealAssigned = tRoot.guaranteed; + + leafQueueNames = + recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + } // based on ideal allocation select containers to be preempted from each // queue and each application Map> toPreempt = - getContainersToPreempt(queues, clusterResources); + getContainersToPreempt(leafQueueNames, clusterResources); if (LOG.isDebugEnabled()) { - logToCSV(queues); + logToCSV(new ArrayList(leafQueueNames)); } // if we are in observeOnly mode return before any action is taken @@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // preempt (or kill) the selected containers for (Map.Entry> e : toPreempt.entrySet()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + e.getKey() + + " #containers-to-be-preempted=" + e.getValue().size()); + } for (RMContainer container : e.getValue()) { // if we tried to preempt this for more than maxWaitTime if (preempted.get(container) != null && @@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param totalPreemptionAllowed maximum amount of preemption allowed * @return a list of leaf queues updated with preemption targets */ - private List recursivelyComputeIdealAssignment( - TempQueue root, Resource totalPreemptionAllowed) { - List leafs = new ArrayList(); + private Set recursivelyComputeIdealAssignment( + TempQueuePerPartition root, Resource totalPreemptionAllowed) { + Set leafQueueNames = new HashSet<>(); if (root.getChildren() != null && root.getChildren().size() > 0) { // compute ideal distribution at this level computeIdealResourceDistribution(rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned); // compute recursively for lower levels and build list of leafs - for(TempQueue t : root.getChildren()) { - leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed)); + for(TempQueuePerPartition t : root.getChildren()) { + leafQueueNames.addAll(recursivelyComputeIdealAssignment(t, + totalPreemptionAllowed)); } } else { // we are in a leaf nothing to do, just return yourself - return Collections.singletonList(root); + return ImmutableSet.of(root.queueName); } - return leafs; + return leafQueueNames; } /** @@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param tot_guarant the amount of capacity assigned to this pool of queues */ private void computeIdealResourceDistribution(ResourceCalculator rc, - List queues, Resource totalPreemptionAllowed, Resource tot_guarant) { + List queues, Resource totalPreemptionAllowed, + Resource tot_guarant) { // qAlloc tracks currently active queues (will decrease progressively as // demand is met) - List qAlloc = new ArrayList(queues); + List qAlloc = new ArrayList(queues); // unassigned tracks how much resources are still to assign, initialized // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); // group queues based on whether they have non-zero guaranteed capacity - Set nonZeroGuarQueues = new HashSet(); - Set zeroGuarQueues = new HashSet(); + Set nonZeroGuarQueues = new HashSet(); + Set zeroGuarQueues = new HashSet(); - for (TempQueue q : qAlloc) { + for (TempQueuePerPartition q : qAlloc) { if (Resources .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { nonZeroGuarQueues.add(q); @@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // based on ideal assignment computed above and current assignment we derive // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); - for (TempQueue t:queues) { + for (TempQueuePerPartition t:queues) { if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { Resources.addTo(totPreemptionNeeded, Resources.subtract(t.current, t.idealAssigned)); @@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // assign to each queue the amount of actual preemption based on local // information of ideal preemption and scaling factor - for (TempQueue t : queues) { + for (TempQueuePerPartition t : queues) { t.assignPreemption(scalingFactor, rc, tot_guarant); } if (LOG.isDebugEnabled()) { long time = clock.getTime(); - for (TempQueue t : queues) { + for (TempQueuePerPartition t : queues) { LOG.debug(time + ": " + t); } } @@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * distributed uniformly. */ private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, Resource unassigned, - boolean ignoreGuarantee) { + Resource tot_guarant, Collection qAlloc, + Resource unassigned, boolean ignoreGuarantee) { // Prior to assigning the unused resources, process each queue as follows: // If current > guaranteed, idealAssigned = guaranteed + untouchable extra // Else idealAssigned = current; @@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // idealAssigned >= current + pending), remove it from consideration. // Sort queues from most under-guaranteed to most over-guaranteed. TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = - new PriorityQueue(10,tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueue q = i.next(); + PriorityQueue orderedByNeed = + new PriorityQueue(10, tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); } else { @@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // place it back in the ordered list of queues, recalculating its place // in the order of most under-guaranteed to most over-guaranteed. In this // way, the most underserved queue(s) are always given resources first. - Collection underserved = + Collection underserved = getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i.hasNext();) { - TempQueue sub = i.next(); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); Resource wQidle = sub.offer(wQavail, rc, tot_guarant); @@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // Take the most underserved TempQueue (the one on the head). Collect and // return the list of all queues that have the same idealAssigned // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, TQComparator tqComparator) { - ArrayList underserved = new ArrayList(); + protected Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, TQComparator tqComparator) { + ArrayList underserved = new ArrayList(); while (!orderedByNeed.isEmpty()) { - TempQueue q1 = orderedByNeed.remove(); + TempQueuePerPartition q1 = orderedByNeed.remove(); underserved.add(q1); - TempQueue q2 = orderedByNeed.peek(); + TempQueuePerPartition q2 = orderedByNeed.peek(); // q1's pct of guaranteed won't be larger than q2's. If it's less, then // return what has already been collected. Otherwise, q1's pct of // guaranteed == that of q2, so add q2 to underserved list during the @@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param queues the list of queues to consider */ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { + Collection queues, boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); if (ignoreGuar) { - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); } } else { - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { Resources.addTo(activeCap, q.guaranteed); } - for (TempQueue q : queues) { + for (TempQueuePerPartition q : queues) { q.normalizedGuarantee = Resources.divide(rc, clusterResource, q.guaranteed, activeCap); } } } + private String getPartitionByNodeId(NodeId nodeId) { + return scheduler.getSchedulerNode(nodeId).getPartition(); + } + + /** + * Return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Map resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map> preemptMap) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); + Resource toObtainByPartition = + resourceToObtainByPartitions.get(nodePartition); + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none())) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Marked container=" + rmContainer.getContainerId() + + " in partition=" + nodePartition + " will be preempted"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } + + private void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + /** * Based a resource preemption target drop reservations of containers and * if necessary select containers for preemption from applications in each @@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @return a map of applciationID to set of containers to preempt */ private Map> getContainersToPreempt( - List queues, Resource clusterResource) { + Set leafQueueNames, Resource clusterResource) { - Map> preemptMap = - new HashMap>(); + Map> preemptMap = + new HashMap>(); List skippedAMContainerlist = new ArrayList(); - for (TempQueue qT : queues) { - if (qT.preemptionDisabled && qT.leafQueue != null) { + // Loop all leaf queues + for (String queueName : leafQueueNames) { + // check if preemption disabled for the queue + if (getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { if (LOG.isDebugEnabled()) { - if (Resources.greaterThan(rc, clusterResource, - qT.toBePreempted, Resource.newInstance(0, 0))) { - LOG.debug("Tried to preempt the following " - + "resources from non-preemptable queue: " - + qT.queueName + " - Resources: " + qT.toBePreempted); - } + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); } continue; } - // we act only if we are violating balance by more than - // maxIgnoredOverCapacity - if (Resources.greaterThan(rc, clusterResource, qT.current, - Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { - // we introduce a dampening factor naturalTerminationFactor that - // accounts for natural termination of containers - Resource resToObtain = - Resources.multiply(qT.toBePreempted, naturalTerminationFactor); - Resource skippedAMSize = Resource.newInstance(0, 0); - // lock the leafqueue while we scan applications and unreserve - synchronized (qT.leafQueue) { - Iterator desc = - qT.leafQueue.getOrderingPolicy().getPreemptionIterator(); + // compute resToObtainByPartition considered inter-queue preemption + LeafQueue leafQueue = null; + + Map resToObtainByPartition = + new HashMap(); + for (TempQueuePerPartition qT : getQueuePartitions(queueName)) { + leafQueue = qT.leafQueue; + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = + Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + resToObtainByPartition.put(qT.partition, resToObtain); + if (LOG.isDebugEnabled()) { + LOG.debug("Queue=" + queueName + " partition=" + qT.partition + + " resource-to-obtain=" + resToObtain); + } + } qT.actuallyPreempted = Resources.clone(resToObtain); - while (desc.hasNext()) { - FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { - break; + } else { + qT.actuallyPreempted = Resources.none(); + } + } + + synchronized (leafQueue) { + // go through all ignore-partition-exclusivity containers first to make + // sure such containers will be preempted first + Map> ignorePartitionExclusivityContainers = + leafQueue.getIgnoreExclusivityRMContainers(); + for (String partition : resToObtainByPartition.keySet()) { + if (ignorePartitionExclusivityContainers.containsKey(partition)) { + TreeSet rmContainers = + ignorePartitionExclusivityContainers.get(partition); + // We will check container from reverse order, so latter submitted + // application's containers will be preempted first. + for (RMContainer c : rmContainers.descendingSet()) { + boolean preempted = + tryPreemptContainerAndDeductResToObtain( + resToObtainByPartition, c, clusterResource, preemptMap); + if (!preempted) { + break; + } } - preemptMap.put( - fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain, - skippedAMContainerlist, skippedAMSize)); } - Resource maxAMCapacityForThisQueue = Resources.multiply( - Resources.multiply(clusterResource, - qT.leafQueue.getAbsoluteCapacity()), - qT.leafQueue.getMaxAMResourcePerQueuePercent()); - - // Can try preempting AMContainers (still saving atmost - // maxAMCapacityForThisQueue AMResource's) if more resources are - // required to be preempted from this Queue. - preemptAMContainers(clusterResource, preemptMap, - skippedAMContainerlist, resToObtain, skippedAMSize, - maxAMCapacityForThisQueue); } + + // preempt other containers + Resource skippedAMSize = Resource.newInstance(0, 0); + Iterator desc = + leafQueue.getOrderingPolicy().getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + // When we complete preempt from one partition, we will remove from + // resToObtainByPartition, so when it becomes empty, we can get no + // more preemption is needed + if (resToObtainByPartition.isEmpty()) { + break; + } + + preemptFrom(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap); + } + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preempted from this Queue. + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + leafQueue.getAbsoluteCapacity()), + leafQueue.getMaxAMResourcePerQueuePercent()); + + preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, + resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue); } } + return preemptMap; } @@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic */ private void preemptAMContainers(Resource clusterResource, Map> preemptMap, - List skippedAMContainerlist, Resource resToObtain, - Resource skippedAMSize, Resource maxAMCapacityForThisQueue) { + List skippedAMContainerlist, + Map resToObtainByPartition, Resource skippedAMSize, + Resource maxAMCapacityForThisQueue) { for (RMContainer c : skippedAMContainerlist) { // Got required amount of resources for preemption, can stop now - if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, - Resources.none())) { + if (resToObtainByPartition.isEmpty()) { break; } // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, - // container selection iteration for preemption will be stopped. + // container selection iteration for preemption will be stopped. if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, maxAMCapacityForThisQueue)) { break; } - Set contToPrempt = preemptMap.get(c - .getApplicationAttemptId()); - if (null == contToPrempt) { - contToPrempt = new HashSet(); - preemptMap.put(c.getApplicationAttemptId(), contToPrempt); + + boolean preempted = + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + if (preempted) { + Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } - contToPrempt.add(c); - - Resources.subtractFrom(resToObtain, c.getContainer().getResource()); - Resources.subtractFrom(skippedAMSize, c.getContainer() - .getResource()); } skippedAMContainerlist.clear(); } @@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic /** * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). - * - * @param app - * @param clusterResource - * @param rsrcPreempt - * @return Set Set of RMContainers */ - private Set preemptFrom(FiCaSchedulerApp app, - Resource clusterResource, Resource rsrcPreempt, - List skippedAMContainerlist, Resource skippedAMSize) { - Set ret = new HashSet(); + private void preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Map resToObtainByPartition, + List skippedAMContainerlist, Resource skippedAMSize, + Map> preemptMap) { ApplicationAttemptId appId = app.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at application=" + app.getApplicationAttemptId() + + " resourceToObtain=" + resToObtainByPartition); + } // first drop reserved containers towards rsrcPreempt - List reservations = + List reservedContainers = new ArrayList(app.getReservedContainers()); - for (RMContainer c : reservations) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { - return ret; + for (RMContainer c : reservedContainers) { + if (resToObtainByPartition.isEmpty()) { + return; } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + if (!observeOnly) { dispatcher.handle(new ContainerPreemptEvent(appId, c, ContainerPreemptEventType.DROP_RESERVATION)); } - Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } // if more resources are to be freed go through all live containers in // reverse priority and reverse allocation order and mark them for // preemption - List containers = + List liveContainers = new ArrayList(app.getLiveContainers()); - sortContainers(containers); + sortContainers(liveContainers); - for (RMContainer c : containers) { - if (Resources.lessThanOrEqual(rc, clusterResource, - rsrcPreempt, Resources.none())) { - return ret; + for (RMContainer c : liveContainers) { + if (resToObtainByPartition.isEmpty()) { + return; } + // Skip AM Container from preemption for now. if (c.isAMContainer()) { skippedAMContainerlist.add(c); - Resources.addTo(skippedAMSize, c.getContainer().getResource()); - continue; - } - // skip Labeled resource - if(isLabeledContainer(c)){ + Resources.addTo(skippedAMSize, c.getAllocatedResource()); continue; } - ret.add(c); - Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); - } - return ret; - } - - /** - * Checking if given container is a labeled container - * - * @param c - * @return true/false - */ - private boolean isLabeledContainer(RMContainer c) { - return labels.containsKey(c.getAllocatedNode()); + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap); + } } /** @@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * the leaves. Finally it aggregates pending resources in each queue and rolls * it up to higher levels. * - * @param root the root of the CapacityScheduler queue hierarchy - * @param clusterResources the total amount of resources in the cluster + * @param curQueue current queue which I'm looking at now + * @param partitionResource the total amount of resources in the cluster * @return the root of the cloned queue hierarchy */ - private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { - TempQueue ret; - synchronized (root) { - String queueName = root.getQueueName(); - float absUsed = root.getAbsoluteUsedCapacity(); - float absCap = root.getAbsoluteCapacity(); - float absMaxCap = root.getAbsoluteMaximumCapacity(); - boolean preemptionDisabled = root.getPreemptionDisabled(); - - Resource current = Resources.multiply(clusterResources, absUsed); - Resource guaranteed = Resources.multiply(clusterResources, absCap); - Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + private TempQueuePerPartition cloneQueues(CSQueue curQueue, + Resource partitionResource, String partitionToLookAt) { + TempQueuePerPartition ret; + synchronized (curQueue) { + String queueName = curQueue.getQueueName(); + QueueCapacities qc = curQueue.getQueueCapacities(); + float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt); + float absCap = qc.getAbsoluteCapacity(partitionToLookAt); + float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); + boolean preemptionDisabled = curQueue.getPreemptionDisabled(); + + Resource current = Resources.multiply(partitionResource, absUsed); + Resource guaranteed = Resources.multiply(partitionResource, absCap); + Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap); + + // when partition is a non-exclusive partition, the actual maxCapacity + // could more than specified maxCapacity + try { + if (!scheduler.getRMContext().getNodeLabelManager() + .isExclusiveNodeLabel(partitionToLookAt)) { + maxCapacity = + Resources.max(rc, partitionResource, maxCapacity, current); + } + } catch (IOException e) { + // This may cause by partition removed when running capacity monitor, + // just ignore the error, this will be corrected when doing next check. + } Resource extra = Resource.newInstance(0, 0); - if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) { extra = Resources.subtract(current, guaranteed); } - if (root instanceof LeafQueue) { - LeafQueue l = (LeafQueue) root; - Resource pending = l.getTotalResourcePending(); - ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity, preemptionDisabled); + if (curQueue instanceof LeafQueue) { + LeafQueue l = (LeafQueue) curQueue; + Resource pending = + l.getQueueResourceUsage().getPending(partitionToLookAt); + ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, + maxCapacity, preemptionDisabled, partitionToLookAt); if (preemptionDisabled) { ret.untouchableExtra = extra; } else { @@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity, false); + ret = + new TempQueuePerPartition(curQueue.getQueueName(), current, pending, + guaranteed, maxCapacity, false, partitionToLookAt); Resource childrensPreemptable = Resource.newInstance(0, 0); - for (CSQueue c : root.getChildQueues()) { - TempQueue subq = cloneQueues(c, clusterResources); + for (CSQueue c : curQueue.getChildQueues()) { + TempQueuePerPartition subq = + cloneQueues(c, partitionResource, partitionToLookAt); Resources.addTo(childrensPreemptable, subq.preemptableExtra); ret.addChild(subq); } // untouchableExtra = max(extra - childrenPreemptable, 0) if (Resources.greaterThanOrEqual( - rc, clusterResources, childrensPreemptable, extra)) { + rc, partitionResource, childrensPreemptable, extra)) { ret.untouchableExtra = Resource.newInstance(0, 0); } else { ret.untouchableExtra = @@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } } + addTempQueuePartition(ret); return ret; } // simple printout function that reports internal queue state (useful for // plotting) - private void logToCSV(List unorderedqueues){ - List queues = new ArrayList(unorderedqueues); - Collections.sort(queues, new Comparator(){ - @Override - public int compare(TempQueue o1, TempQueue o2) { - return o1.queueName.compareTo(o2.queueName); - }}); + private void logToCSV(List leafQueueNames){ + Collections.sort(leafQueueNames); String queueState = " QUEUESTATE: " + clock.getTime(); StringBuilder sb = new StringBuilder(); sb.append(queueState); - for (TempQueue tq : queues) { + + for (String queueName : leafQueueNames) { + TempQueuePerPartition tq = + getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL); sb.append(", "); tq.appendLogString(sb); } LOG.debug(sb.toString()); } + private void addTempQueuePartition(TempQueuePerPartition queuePartition) { + String queueName = queuePartition.queueName; + + Map queuePartitions; + if (null == (queuePartitions = queueToPartitions.get(queueName))) { + queuePartitions = new HashMap(); + queueToPartitions.put(queueName, queuePartitions); + } + queuePartitions.put(queuePartition.partition, queuePartition); + } + + /** + * Get queue partition by given queueName and partitionName + */ + private TempQueuePerPartition getQueueByPartition(String queueName, + String partition) { + Map partitionToQueues = null; + if (null == (partitionToQueues = queueToPartitions.get(queueName))) { + return null; + } + return partitionToQueues.get(partition); + } + + /** + * Get all queue partitions by given queueName + */ + private Collection getQueuePartitions(String queueName) { + if (!queueToPartitions.containsKey(queueName)) { + return null; + } + return queueToPartitions.get(queueName).values(); + } + /** * Temporary data-structure tracking resource availability, pending resource - * need, current utilization. Used to clone {@link CSQueue}. + * need, current utilization. This is per-queue-per-partition data structure */ - static class TempQueue { + static class TempQueuePerPartition { final String queueName; final Resource current; final Resource pending; final Resource guaranteed; final Resource maxCapacity; + final String partition; Resource idealAssigned; Resource toBePreempted; + // For logging purpose Resource actuallyPreempted; Resource untouchableExtra; Resource preemptableExtra; double normalizedGuarantee; - final ArrayList children; + final ArrayList children; LeafQueue leafQueue; boolean preemptionDisabled; - TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) { + TempQueuePerPartition(String queueName, Resource current, Resource pending, + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled, + String partition) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; - this.children = new ArrayList(); + this.children = new ArrayList(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); this.preemptionDisabled = preemptionDisabled; + this.partition = partition; } public void setLeafQueue(LeafQueue l){ @@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * When adding a child we also aggregate its pending resource needs. * @param q the child queue to add to this queue */ - public void addChild(TempQueue q) { + public void addChild(TempQueuePerPartition q) { assert leafQueue == null; children.add(q); Resources.addTo(pending, q.pending); } - public void addChildren(ArrayList queues) { + public void addChildren(ArrayList queues) { assert leafQueue == null; children.addAll(queues); } - public ArrayList getChildren(){ + public ArrayList getChildren(){ return children; } @@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public void printAll() { LOG.info(this.toString()); - for (TempQueue sub : this.getChildren()) { + for (TempQueuePerPartition sub : this.getChildren()) { sub.printAll(); } } @@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } - static class TQComparator implements Comparator { + static class TQComparator implements Comparator { private ResourceCalculator rc; private Resource clusterRes; @@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } @Override - public int compare(TempQueue tq1, TempQueue tq2) { + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { return -1; } @@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // Calculates idealAssigned / guaranteed // TempQueues with 0 guarantees are always considered the most over // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueue q) { + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { double pctOver = Integer.MAX_VALUE; if (q != null && Resources.greaterThan( rc, clusterRes, q.guaranteed, Resources.none())) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2750d4e..316a450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) -public class RMContainerImpl implements RMContainer { +public class RMContainerImpl implements RMContainer, Comparable { private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); @@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer { } return nodeLabelExpression; } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RMContainer) { + if (null != getContainerId()) { + return getContainerId().equals(((RMContainer) obj).getContainerId()); + } + } + return false; + } + + @Override + public int hashCode() { + if (null != getContainerId()) { + return getContainerId().hashCode(); + } + return super.hashCode(); + } + + @Override + public int compareTo(RMContainer o) { + if (containerId != null && o.getContainerId() != null) { + return containerId.compareTo(o.getContainerId()); + } + return -1; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1e1623d..48c7f2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -153,7 +153,7 @@ public class CapacityScheduler extends static final PartitionedQueueComparator partitionedQueueComparator = new PartitionedQueueComparator(); - static final Comparator applicationComparator = + public static final Comparator applicationComparator = new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 22aafaa..56ade84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue { private final QueueResourceLimitsInfo queueResourceLimitsInfo = new QueueResourceLimitsInfo(); - + private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; private OrderingPolicy orderingPolicy = new FifoOrderingPolicy(); + + // record all ignore partition exclusivityRMContainer, this will be used to do + // preemption, key is the partition of the RMContainer allocated on + private Map> ignorePartitionExclusivityRMContainers = + new HashMap<>(); public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue { Resource assigned = assignment.getResource(); if (Resources.greaterThan( resourceCalculator, clusterResource, assigned, Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment + .getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getPartition()); + node.getPartition(), reservedOrAllocatedRMContainer); // Don't reset scheduling opportunities for offswitch assignments // otherwise the app will be delayed for each non-local assignment. @@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.containerReleased(application, rmContainer); releaseResource(clusterResource, application, - container.getResource(), node.getPartition()); + container.getResource(), node.getPartition(), rmContainer); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue { synchronized void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, - String nodePartition) { + String nodePartition, RMContainer rmContainer) { super.allocateResource(clusterResource, resource, nodePartition); + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) + && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + TreeSet rmContainers = null; + if (null == (rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition))) { + rmContainers = new TreeSet<>(); + ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers); + } + rmContainers.add(rmContainer); + } + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue { } } - synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource, String nodePartition) { + synchronized void releaseResource(Resource clusterResource, + FiCaSchedulerApp application, Resource resource, String nodePartition, + RMContainer rmContainer) { super.releaseResource(clusterResource, resource, nodePartition); + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) + && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { + Set rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition); + rmContainers.remove(rmContainer); + if (rmContainers.isEmpty()) { + ignorePartitionExclusivityRMContainers.remove(nodePartition); + } + } + } + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue { } } + /** + * return all ignored partition exclusivity RMContainers in the LeafQueue, this + * will be used by preemption policy, and use of return + * ignorePartitionExclusivityRMContainer should protected by LeafQueue + * synchronized lock + */ + public synchronized Map> + getIgnoreExclusivityRMContainers() { + return ignorePartitionExclusivityRMContainers; + } + public void setCapacity(float capacity) { queueCapacities.setCapacity(capacity); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index c5c067d..5158255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -18,16 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + @InterfaceAudience.Private @InterfaceStability.Unstable public class AssignmentInformation { @@ -117,4 +118,24 @@ public class AssignmentInformation { public List getReservationDetails() { return operationDetails.get(Operation.RESERVATION); } + + private ContainerId getFirstContainerIdFromOperation(Operation op) { + if (null != operationDetails.get(Operation.ALLOCATION)) { + List assignDetails = + operationDetails.get(Operation.ALLOCATION); + if (!assignDetails.isEmpty()) { + return assignDetails.get(0).containerId; + } + } + return null; + } + + public ContainerId getFirstAllocatedOrReservedContainerId() { + ContainerId containerId = null; + containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); + if (null != containerId) { + return containerId; + } + return getFirstContainerIdFromOperation(Operation.RESERVATION); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d497f6ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 9e8b769..6c0ed6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -37,27 +38,17 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Iterator; -import java.util.Map; import java.util.NavigableSet; import java.util.Random; -import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; -import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import org.mortbay.log.Log; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestProportionalCapacityPreemptionPolicy { @@ -799,50 +794,6 @@ public class TestProportionalCapacityPreemptionPolicy { } @Test - public void testIdealAllocationForLabels() { - int[][] qData = new int[][] { - // / A B - { 80, 40, 40 }, // abs - { 80, 80, 80 }, // maxcap - { 80, 80, 0 }, // used - { 70, 20, 50 }, // pending - { 0, 0, 0 }, // reserved - { 5, 4, 1 }, // apps - { -1, 1, 1 }, // req granularity - { 2, 0, 0 }, // subqueues - }; - setAMContainer = true; - setLabeledContainer = true; - Map> labels = new HashMap>(); - NodeId node = NodeId.newInstance("node1", 0); - Set labelSet = new HashSet(); - labelSet.add("x"); - labels.put(node, labelSet); - when(lm.getNodeLabels()).thenReturn(labels); - ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); - // Subtracting Label X resources from cluster resources - when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( - Resources.clone(Resource.newInstance(80, 0))); - clusterResources.setMemory(100); - policy.editSchedule(); - - // By skipping AM Container and Labeled container, all other 18 containers - // of appD will be - // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD))); - - // By skipping AM Container and Labeled container, all other 18 containers - // of appC will be - // preempted - verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC))); - - // rest 4 containers from appB will be preempted - verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); - setAMContainer = false; - setLabeledContainer = false; - } - - @Test public void testPreemptSkippedAMContainers() { int[][] qData = new int[][] { // / A B @@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy { clusterResources = Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); + when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( + clusterResources); + + SchedulerNode mNode = mock(SchedulerNode.class); + when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); + when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); return policy; } @@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy { float tot = leafAbsCapacities(abs, queues); Deque pqs = new LinkedList(); ParentQueue root = mockParentQueue(null, queues[0], pqs); - when(root.getQueueName()).thenReturn("/"); + when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT); when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); - when(root.getQueuePath()).thenReturn("root"); + QueueCapacities rootQc = new QueueCapacities(true); + rootQc.setAbsoluteUsedCapacity(used[0] / tot); + rootQc.setAbsoluteCapacity(abs[0] / tot); + rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot); + when(root.getQueueCapacities()).thenReturn(rootQc); + when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT); boolean preemptionDisabled = mockPreemptionStatus("root"); when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled); @@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy { when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + + // We need to make these fields to QueueCapacities + QueueCapacities qc = new QueueCapacities(false); + qc.setAbsoluteUsedCapacity(used[i] / tot); + qc.setAbsoluteCapacity(abs[i] / tot); + qc.setAbsoluteMaximumCapacity(maxCap[i] / tot); + when(q.getQueueCapacities()).thenReturn(qc); + String parentPathName = p.getQueuePath(); parentPathName = (parentPathName == null) ? "root" : parentPathName; String queuePathName = (parentPathName+"."+queueName).replace("/","root"); @@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy { return pq; } + @SuppressWarnings("rawtypes") LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); @@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy { new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setPending(Resource.newInstance(pending[i], 0)); + when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible final NavigableSet qApps = new TreeSet( new Comparator() { @@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy { when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); when(mC.getApplicationAttemptId()).thenReturn(appAttId); + when(mC.getAllocatedResource()).thenReturn(r); if (priority.AMCONTAINER.getValue() == cpriority) { when(mC.isAMContainer()).thenReturn(true); }