Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7121E200BAF for ; Mon, 31 Oct 2016 23:26:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6F8E8160B05; Mon, 31 Oct 2016 22:26:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 22D62160AED for ; Mon, 31 Oct 2016 23:26:12 +0100 (CET) Received: (qmail 87226 invoked by uid 500); 31 Oct 2016 22:26:07 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 87205 invoked by uid 99); 31 Oct 2016 22:26:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Oct 2016 22:26:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1908E9437; Mon, 31 Oct 2016 22:26:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Mon, 31 Oct 2016 22:26:07 -0000 Message-Id: <3231145f174a4582a84db96e32f19f26@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda) archived-at: Mon, 31 Oct 2016 22:26:15 -0000 YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda) (cherry picked from commit 90dd3a8148468ac37a3f2173ad8d45e38bfcb0c9) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cef281ab Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cef281ab Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cef281ab Branch: refs/heads/branch-2 Commit: cef281abe6c7db4dc5c88018a7a2fa810a239c26 Parents: 9d13a13 Author: Wangda Tan Authored: Mon Oct 31 15:18:31 2016 -0700 Committer: Wangda Tan Committed: Mon Oct 31 15:23:16 2016 -0700 ---------------------------------------------------------------------- .../AbstractPreemptableResourceCalculator.java | 244 ++++++ .../capacity/AbstractPreemptionEntity.java | 98 +++ .../CapacitySchedulerPreemptionContext.java | 14 + .../CapacitySchedulerPreemptionUtils.java | 119 ++- .../capacity/FifoCandidatesSelector.java | 127 +-- .../FifoIntraQueuePreemptionPlugin.java | 459 ++++++++++ .../capacity/IntraQueueCandidatesSelector.java | 238 +++++ .../IntraQueuePreemptionComputePlugin.java | 39 + .../capacity/PreemptableResourceCalculator.java | 183 +--- .../capacity/PreemptionCandidatesSelector.java | 29 +- .../ProportionalCapacityPreemptionPolicy.java | 86 +- .../monitor/capacity/TempAppPerPartition.java | 101 +++ .../monitor/capacity/TempQueuePerPartition.java | 142 ++- .../CapacitySchedulerConfiguration.java | 31 + .../scheduler/capacity/LeafQueue.java | 39 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 18 + ...alCapacityPreemptionPolicyMockFramework.java | 126 ++- ...ionalCapacityPreemptionPolicyIntraQueue.java | 868 +++++++++++++++++++ 18 files changed, 2549 insertions(+), 412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java new file mode 100644 index 0000000..8255a30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link PreemptionCandidatesSelector}. + */ +public class AbstractPreemptableResourceCalculator { + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + private boolean isReservedPreemptionCandidatesSelector; + + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(), + Resources.none())) { + pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, + q.getGuaranteed()); + } + return (pctOver); + } + } + + /** + * PreemptableResourceCalculator constructor. + * + * @param preemptionContext context + * @param isReservedPreemptionCandidatesSelector + * this will be set by different implementation of candidate + * selectors, please refer to TempQueuePerPartition#offer for + * details. + */ + public AbstractPreemptableResourceCalculator( + CapacitySchedulerPreemptionContext preemptionContext, + boolean isReservedPreemptionCandidatesSelector) { + context = preemptionContext; + rc = preemptionContext.getResourceCalculator(); + this.isReservedPreemptionCandidatesSelector = + isReservedPreemptionCandidatesSelector; + } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + * + * @param totGuarant + * total guaranteed resource + * @param qAlloc + * List of child queues + * @param unassigned + * Unassigned resource per queue + * @param ignoreGuarantee + * ignore guarantee per queue. + */ + protected void computeFixpointAllocation(Resource totGuarant, + Collection qAlloc, Resource unassigned, + boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, totGuarant); + PriorityQueue orderedByNeed = new PriorityQueue<>(10, + tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + Resource used = q.getUsed(); + + if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) { + q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(used); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (allocated + used + pending), q needs more + // resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.getUsed(), q.pending); + if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + // assign all cluster resources until no more demand, or no resources are + // left + while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = getMostUnderservedQueues( + orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, + sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, totGuarant, + isReservedPreemptionCandidatesSelector); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + + // Sometimes its possible that, all queues are properly served. So intra + // queue preemption will not try for any preemption. How ever there are + // chances that within a queue, there are some imbalances. Hence make sure + // all queues are added to list. + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); + } + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues. + * + * @param clusterResource + * the total amount of resources in the cluster + * @param queues + * the list of queues to consider + * @param ignoreGuar + * ignore guarantee. + */ + private void resetCapacity(Resource clusterResource, + Collection queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = 1.0f / queues.size(); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.getGuaranteed()); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.getGuaranteed(), activeCap); + } + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + private Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, + TQComparator tqComparator) { + ArrayList underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + + // Add underserved queues in order for later uses + context.addPartitionToUnderServedQueues(q1.queueName, q1.partition); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1, q2) < 0) { + if (null != q2) { + context.addPartitionToUnderServedQueues(q2.queueName, q2.partition); + } + return underserved; + } + } + return underserved; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java new file mode 100644 index 0000000..dbd1f0a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + + +/** + * Abstract temporary data-structure for tracking resource availability,pending + * resource need, current utilization for app/queue. + */ +public class AbstractPreemptionEntity { + // Following fields are copied from scheduler + final String queueName; + + protected final Resource current; + protected final Resource amUsed; + protected final Resource reserved; + protected Resource pending; + + // Following fields are settled and used by candidate selection policies + Resource idealAssigned; + Resource toBePreempted; + Resource selected; + private Resource actuallyToBePreempted; + private Resource toBePreemptFromOther; + + AbstractPreemptionEntity(String queueName, Resource usedPerPartition, + Resource amUsedPerPartition, Resource reserved, + Resource pendingPerPartition) { + this.queueName = queueName; + this.current = usedPerPartition; + this.pending = pendingPerPartition; + this.reserved = reserved; + this.amUsed = amUsedPerPartition; + + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyToBePreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.toBePreemptFromOther = Resource.newInstance(0, 0); + this.selected = Resource.newInstance(0, 0); + } + + public Resource getUsed() { + return current; + } + + public Resource getUsedDeductAM() { + return Resources.subtract(current, amUsed); + } + + public Resource getAMUsed() { + return amUsed; + } + + public Resource getPending() { + return pending; + } + + public Resource getReserved() { + return reserved; + } + + public Resource getActuallyToBePreempted() { + return actuallyToBePreempted; + } + + public void setActuallyToBePreempted(Resource actuallyToBePreempted) { + this.actuallyToBePreempted = actuallyToBePreempted; + } + + public Resource getToBePreemptFromOther() { + return toBePreemptFromOther; + } + + public void setToBePreemptFromOther(Resource toBePreemptFromOther) { + this.toBePreemptFromOther = toBePreemptFromOther; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index c52127d..982b1f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.Set; interface CapacitySchedulerPreemptionContext { @@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext { Set getLeafQueueNames(); Set getAllPartitions(); + + int getClusterMaxApplicationPriority(); + + Resource getPartitionResource(String partition); + + LinkedHashSet getUnderServedQueuesPerPartition(String partition); + + void addPartitionToUnderServedQueues(String queueName, String partition); + + float getMinimumThresholdForIntraQueuePreemption(); + + float getMaxAllowableLimitForIntraQueuePreemption(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index 42d8730..abad2a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils { continue; } - // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 + // Only add resToObtainByPartition when actuallyToBePreempted resource >= + // 0 if (Resources.greaterThan(context.getResourceCalculator(), clusterResource, qT.getActuallyToBePreempted(), Resources.none())) { resToObtainByPartition.put(qT.partition, @@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils { return false; } - Set containers = selectedCandidates.get( - container.getApplicationAttemptId()); + Set containers = selectedCandidates + .get(container.getApplicationAttemptId()); if (containers == null) { return false; } @@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils { Map> selectedCandidates) { for (Set containers : selectedCandidates.values()) { for (RMContainer c : containers) { - SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode( - c.getAllocatedNode()); + SchedulerNode schedulerNode = context.getScheduler() + .getSchedulerNode(c.getAllocatedNode()); if (null == schedulerNode) { continue; } @@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils { if (null != res) { tq.deductActuallyToBePreempted(context.getResourceCalculator(), tq.totalPartitionResource, res); + Collection tas = tq.getApps(); + if (null == tas || tas.isEmpty()) { + continue; + } + + deductPreemptableResourcePerApp(context, tq.totalPartitionResource, + tas, res, partition); } } } } + + private static void deductPreemptableResourcePerApp( + CapacitySchedulerPreemptionContext context, + Resource totalPartitionResource, Collection tas, + Resource res, String partition) { + for (TempAppPerPartition ta : tas) { + ta.deductActuallyToBePreempted(context.getResourceCalculator(), + totalPartitionResource, res, partition); + } + } + + /** + * Invoke this method to preempt container based on resToObtain. + * + * @param rc + * resource calculator + * @param context + * preemption context + * @param resourceToObtainByPartitions + * map to hold resource to obtain per partition + * @param rmContainer + * container + * @param clusterResource + * total resource + * @param preemptMap + * map to hold preempted containers + * @param totalPreemptionAllowed + * total preemption allowed per round + * @return should we preempt rmContainer. If we should, deduct from + * resourceToObtainByPartition + */ + public static boolean tryPreemptContainerAndDeductResToObtain( + ResourceCalculator rc, CapacitySchedulerPreemptionContext context, + Map resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map> preemptMap, + Resource totalPreemptionAllowed) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(context, + rmContainer.getAllocatedNode()); + Resource toObtainByPartition = resourceToObtainByPartitions + .get(nodePartition); + + if (null != toObtainByPartition + && Resources.greaterThan(rc, clusterResource, toObtainByPartition, + Resources.none()) + && Resources.fitsIn(rc, clusterResource, + rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + Resources.subtractFrom(totalPreemptionAllowed, + rmContainer.getAllocatedResource()); + + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private static String getPartitionByNodeId( + CapacitySchedulerPreemptionContext context, NodeId nodeId) { + return context.getScheduler().getSchedulerNode(nodeId).getPartition(); + } + + private static void addToPreemptMap( + Map> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set set = preemptMap.get(appAttemptId); + if (null == set) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } + + private static boolean preemptMapContains( + Map> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set rmContainers = preemptMap.get(attemptId); + if (null == rmContainers) { + return false; + } + return rmContainers.contains(rmContainer); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 9df395d..39336a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -18,11 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -33,9 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -111,9 +106,11 @@ public class FifoCandidatesSelector // Skip already selected containers continue; } - boolean preempted = tryPreemptContainerAndDeductResToObtain( - resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptionAllowed); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, + preemptionContext, resToObtainByPartition, c, + clusterResource, selectedCandidates, + totalPreemptionAllowed); if (!preempted) { continue; } @@ -184,9 +181,10 @@ public class FifoCandidatesSelector break; } - boolean preempted = - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, preemptMap, totalPreemptionAllowed); + boolean preempted = CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, preemptMap, + totalPreemptionAllowed); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -194,68 +192,6 @@ public class FifoCandidatesSelector skippedAMContainerlist.clear(); } - private boolean preemptMapContains( - Map> preemptMap, - ApplicationAttemptId attemptId, RMContainer rmContainer) { - Set rmContainers; - if (null == (rmContainers = preemptMap.get(attemptId))) { - return false; - } - return rmContainers.contains(rmContainer); - } - - /** - * Return should we preempt rmContainer. If we should, deduct from - * resourceToObtainByPartition - */ - private boolean tryPreemptContainerAndDeductResToObtain( - Map resourceToObtainByPartitions, - RMContainer rmContainer, Resource clusterResource, - Map> preemptMap, - Resource totalPreemptionAllowed) { - ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); - - // We will not account resource of a container twice or more - if (preemptMapContains(preemptMap, attemptId, rmContainer)) { - return false; - } - - String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); - Resource toObtainByPartition = - resourceToObtainByPartitions.get(nodePartition); - - if (null != toObtainByPartition && Resources.greaterThan(rc, - clusterResource, toObtainByPartition, Resources.none()) && Resources - .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(), - totalPreemptionAllowed)) { - Resources.subtractFrom(toObtainByPartition, - rmContainer.getAllocatedResource()); - Resources.subtractFrom(totalPreemptionAllowed, - rmContainer.getAllocatedResource()); - - // When we have no more resource need to obtain, remove from map. - if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, - Resources.none())) { - resourceToObtainByPartitions.remove(nodePartition); - } - if (LOG.isDebugEnabled()) { - LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer - .getContainerId() + " from partition=" + nodePartition + " queue=" - + rmContainer.getQueueName() + " to be preemption candidates"); - } - // Add to preemptMap - addToPreemptMap(preemptMap, attemptId, rmContainer); - return true; - } - - return false; - } - - private String getPartitionByNodeId(NodeId nodeId) { - return preemptionContext.getScheduler().getSchedulerNode(nodeId) - .getPartition(); - } - /** * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). @@ -267,10 +203,6 @@ public class FifoCandidatesSelector Map> selectedContainers, Resource totalPreemptionAllowed) { ApplicationAttemptId appId = app.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Looking at application=" + app.getApplicationAttemptId() - + " resourceToObtain=" + resToObtainByPartition); - } // first drop reserved containers towards rsrcPreempt List reservedContainers = @@ -285,8 +217,9 @@ public class FifoCandidatesSelector } // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, selectedContainers, totalPreemptionAllowed); + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedContainers, totalPreemptionAllowed); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -327,39 +260,9 @@ public class FifoCandidatesSelector } // Try to preempt this container - tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, - clusterResource, selectedContainers, totalPreemptionAllowed); - } - } - - /** - * Compare by reversed priority order first, and then reversed containerId - * order - * @param containers - */ - @VisibleForTesting - static void sortContainers(List containers){ - Collections.sort(containers, new Comparator() { - @Override - public int compare(RMContainer a, RMContainer b) { - int schedKeyComp = b.getAllocatedSchedulerKey() - .compareTo(a.getAllocatedSchedulerKey()); - if (schedKeyComp != 0) { - return schedKeyComp; - } - return b.getContainerId().compareTo(a.getContainerId()); - } - }); - } - - private void addToPreemptMap( - Map> preemptMap, - ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set; - if (null == (set = preemptMap.get(appAttemptId))) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedContainers, totalPreemptionAllowed); } - set.add(containerToPreempt); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java new file mode 100644 index 0000000..757f567 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for + * priority and user-limit. + */ +public class FifoIntraQueuePreemptionPlugin + implements + IntraQueuePreemptionComputePlugin { + + protected final CapacitySchedulerPreemptionContext context; + protected final ResourceCalculator rc; + + private static final Log LOG = + LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class); + + public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc, + CapacitySchedulerPreemptionContext preemptionContext) { + this.context = preemptionContext; + this.rc = rc; + } + + @Override + public Map getResourceDemandFromAppsPerQueue( + String queueName, String partition) { + + Map resToObtainByPartition = new HashMap<>(); + TempQueuePerPartition tq = context + .getQueueByPartition(queueName, partition); + + Collection appsOrderedByPriority = tq.getApps(); + Resource actualPreemptNeeded = resToObtainByPartition.get(partition); + + // Updating pending resource per-partition level. + if (actualPreemptNeeded == null) { + actualPreemptNeeded = Resources.createResource(0, 0); + resToObtainByPartition.put(partition, actualPreemptNeeded); + } + + for (TempAppPerPartition a1 : appsOrderedByPriority) { + Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Selected to preempt " + actualPreemptNeeded + + " resource from partition:" + partition); + } + return resToObtainByPartition; + } + + @Override + public void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, + Resource queueReassignableResource, float maxAllowablePreemptLimit) { + + // 1. AM used resource can be considered as a frozen resource for now. + // Hence such containers in a queue can be omitted from the preemption + // calculation. + Map perUserAMUsed = new HashMap(); + Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition, + tq.leafQueue, perUserAMUsed); + Resources.subtractFrom(queueReassignableResource, amUsed); + + // 2. tq.leafQueue will not be null as we validated it in caller side + Collection apps = tq.leafQueue.getAllApplications(); + + // We do not need preemption for a single app + if (apps.size() == 1) { + return; + } + + // 3. Create all tempApps for internal calculation and return a list from + // high priority to low priority order. + TAPriorityComparator taComparator = new TAPriorityComparator(); + PriorityQueue orderedByPriority = + createTempAppForResCalculation(tq.partition, apps, taComparator); + + // 4. Calculate idealAssigned per app by checking based on queue's + // unallocated resource.Also return apps arranged from lower priority to + // higher priority. + TreeSet orderedApps = + calculateIdealAssignedResourcePerApp(clusterResource, + partitionBasedResource, tq, selectedCandidates, + queueReassignableResource, orderedByPriority, perUserAMUsed); + + // 5. A configurable limit that could define an ideal allowable preemption + // limit. Based on current queue's capacity,defined how much % could become + // preemptable. + Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(), + maxAllowablePreemptLimit); + if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable, + tq.getActuallyToBePreempted())) { + Resources.subtractFrom(maxIntraQueuePreemptable, + tq.getActuallyToBePreempted()); + } else { + maxIntraQueuePreemptable = Resource.newInstance(0, 0); + } + + // 6. We have two configurations here, one is intra queue limit and second + // one is per-round limit for any time preemption. Take a minimum of these + Resource preemptionLimit = Resources.min(rc, clusterResource, + maxIntraQueuePreemptable, totalPreemptedResourceAllowed); + + // 7. From lowest priority app onwards, calculate toBePreempted resource + // based on demand. + calculateToBePreemptedResourcePerApp(clusterResource, orderedApps, + preemptionLimit); + + // Save all apps (low to high) to temp queue for further reference + tq.addAllApps(orderedApps); + + // 8. There are chances that we may preempt for the demand from same + // priority level, such cases are to be validated out. + validateOutSameAppPriorityFromDemand(clusterResource, + (TreeSet) tq.getApps()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition); + for (TempAppPerPartition tmpApp : tq.getApps()) { + LOG.debug(tmpApp); + } + } + } + + private void calculateToBePreemptedResourcePerApp(Resource clusterResource, + TreeSet orderedApps, Resource preemptionLimit) { + + for (TempAppPerPartition tmpApp : orderedApps) { + if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit, + Resources.none()) + || Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(), + Resources.none())) { + continue; + } + + Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(), + tmpApp.idealAssigned); + Resources.subtractFrom(preemtableFromApp, tmpApp.selected); + Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed()); + + // Calculate toBePreempted from apps as follows: + // app.preemptable = min(max(app.used - app.selected - app.ideal, 0), + // intra_q_preemptable) + tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources + .max(rc, clusterResource, preemtableFromApp, Resources.none()), + preemptionLimit); + + preemptionLimit = Resources.subtract(preemptionLimit, + tmpApp.toBePreempted); + } + } + + /** + * Algorithm for calculating idealAssigned is as follows: + * For each partition: + * Q.reassignable = Q.used - Q.selected; + * + * # By default set ideal assigned 0 for app. + * app.idealAssigned as 0 + * # get user limit from scheduler. + * userLimitRes = Q.getUserLimit(userName) + * + * # initial all value to 0 + * Map userToAllocated + * + * # Loop from highest priority to lowest priority app to calculate ideal + * for app in sorted-by(priority) { + * if Q.reassignable < 0: + * break; + * + * if (user-to-allocated.get(app.user) < userLimitRes) { + * idealAssigned = min((userLimitRes - userToAllocated.get(app.user)), + * (app.used + app.pending - app.selected)) + * app.idealAssigned = min(Q.reassignable, idealAssigned) + * userToAllocated.get(app.user) += app.idealAssigned; + * } else { + * // skip this app because user-limit reached + * } + * Q.reassignable -= app.idealAssigned + * } + * + * @param clusterResource Cluster Resource + * @param partitionBasedResource resource per partition + * @param tq TempQueue + * @param selectedCandidates Already Selected preemption candidates + * @param queueReassignableResource Resource used in a queue + * @param orderedByPriority List of running apps + * @param perUserAMUsed AM used resource + * @return List of temp apps ordered from low to high priority + */ + private TreeSet calculateIdealAssignedResourcePerApp( + Resource clusterResource, Resource partitionBasedResource, + TempQueuePerPartition tq, + Map> selectedCandidates, + Resource queueReassignableResource, + PriorityQueue orderedByPriority, + Map perUserAMUsed) { + + Comparator reverseComp = Collections + .reverseOrder(new TAPriorityComparator()); + TreeSet orderedApps = new TreeSet<>(reverseComp); + + Map userIdealAssignedMapping = new HashMap<>(); + String partition = tq.partition; + + Map preCalculatedUserLimit = + new HashMap(); + + while (!orderedByPriority.isEmpty()) { + // Remove app from the next highest remaining priority and process it to + // calculate idealAssigned per app. + TempAppPerPartition tmpApp = orderedByPriority.remove(); + orderedApps.add(tmpApp); + + // Once unallocated resource is 0, we can stop assigning ideal per app. + if (Resources.lessThanOrEqual(rc, clusterResource, + queueReassignableResource, Resources.none())) { + continue; + } + + String userName = tmpApp.app.getUser(); + Resource userLimitResource = preCalculatedUserLimit.get(userName); + + // Verify whether we already calculated headroom for this user. + if (userLimitResource == null) { + userLimitResource = Resources.clone(tq.leafQueue + .getUserLimitPerUser(userName, partitionBasedResource, partition)); + + Resource amUsed = perUserAMUsed.get(userName); + if (null == amUsed) { + amUsed = Resources.createResource(0, 0); + } + + // Real AM used need not have to be considered for user-limit as well. + userLimitResource = Resources.subtract(userLimitResource, amUsed); + if (LOG.isDebugEnabled()) { + LOG.debug("Userlimit for user '" + userName + "' is :" + + userLimitResource + ", and amUsed is:" + amUsed); + } + + preCalculatedUserLimit.put(userName, userLimitResource); + } + + Resource idealAssignedForUser = userIdealAssignedMapping.get(userName); + + if (idealAssignedForUser == null) { + idealAssignedForUser = Resources.createResource(0, 0); + userIdealAssignedMapping.put(userName, idealAssignedForUser); + } + + // Calculate total selected container resources from current app. + getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, + tmpApp, partition); + + // For any app, used+pending will give its idealAssigned. However it will + // be tightly linked to queue's unallocated quota. So lower priority apps + // idealAssigned may fall to 0 if higher priority apps demand is more. + Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(), + tmpApp.getPending()); + Resources.subtractFrom(appIdealAssigned, tmpApp.selected); + + if (Resources.lessThan(rc, clusterResource, idealAssignedForUser, + userLimitResource)) { + appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned, + Resources.subtract(userLimitResource, idealAssignedForUser)); + tmpApp.idealAssigned = Resources.clone(Resources.min(rc, + clusterResource, queueReassignableResource, appIdealAssigned)); + Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned); + } else { + continue; + } + + // Also set how much resource is needed by this app from others. + Resource appUsedExcludedSelected = Resources + .subtract(tmpApp.getUsedDeductAM(), tmpApp.selected); + if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned, + appUsedExcludedSelected)) { + tmpApp.setToBePreemptFromOther( + Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected)); + } + + Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned); + } + + return orderedApps; + } + + /* + * Previous policies would have already selected few containers from an + * application. Calculate total resource from these selected containers. + */ + private void getAlreadySelectedPreemptionCandidatesResource( + Map> selectedCandidates, + TempAppPerPartition tmpApp, String partition) { + tmpApp.selected = Resources.createResource(0, 0); + Set containers = selectedCandidates + .get(tmpApp.app.getApplicationAttemptId()); + + if (containers == null) { + return; + } + + for (RMContainer cont : containers) { + if (partition.equals(cont.getNodeLabelExpression())) { + Resources.addTo(tmpApp.selected, cont.getAllocatedResource()); + } + } + } + + private PriorityQueue createTempAppForResCalculation( + String partition, Collection apps, + TAPriorityComparator taComparator) { + PriorityQueue orderedByPriority = new PriorityQueue<>( + 100, taComparator); + + // have an internal temp app structure to store intermediate data(priority) + for (FiCaSchedulerApp app : apps) { + + Resource used = app.getAppAttemptResourceUsage().getUsed(partition); + Resource amUsed = null; + if (!app.isWaitingForAMContainer()) { + amUsed = app.getAMResource(partition); + } + Resource pending = app.getTotalPendingRequestsPerPartition() + .get(partition); + Resource reserved = app.getAppAttemptResourceUsage() + .getReserved(partition); + + used = (used == null) ? Resources.createResource(0, 0) : used; + amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed; + pending = (pending == null) ? Resources.createResource(0, 0) : pending; + reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved; + + HashSet partitions = new HashSet( + app.getAppAttemptResourceUsage().getNodePartitionsSet()); + partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet()); + + // Create TempAppPerQueue for further calculation. + TempAppPerPartition tmpApp = new TempAppPerPartition(app, + Resources.clone(used), Resources.clone(amUsed), + Resources.clone(reserved), Resources.clone(pending)); + + // Set ideal allocation of app as 0. + tmpApp.idealAssigned = Resources.createResource(0, 0); + + orderedByPriority.add(tmpApp); + } + return orderedByPriority; + } + + /* + * Fifo+Priority based preemption policy need not have to preempt resources at + * same priority level. Such cases will be validated out. + */ + public void validateOutSameAppPriorityFromDemand(Resource cluster, + TreeSet appsOrderedfromLowerPriority) { + + TempAppPerPartition[] apps = appsOrderedfromLowerPriority + .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]); + if (apps.length <= 0) { + return; + } + + int lPriority = 0; + int hPriority = apps.length - 1; + + while (lPriority < hPriority + && !apps[lPriority].equals(apps[hPriority]) + && apps[lPriority].getPriority() < apps[hPriority].getPriority()) { + Resource toPreemptFromOther = apps[hPriority] + .getToBePreemptFromOther(); + Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted(); + Resource delta = Resources.subtract(apps[lPriority].toBePreempted, + actuallyToPreempt); + + if (Resources.greaterThan(rc, cluster, delta, Resources.none())) { + Resource toPreempt = Resources.min(rc, cluster, + toPreemptFromOther, delta); + + apps[hPriority].setToBePreemptFromOther( + Resources.subtract(toPreemptFromOther, toPreempt)); + apps[lPriority].setActuallyToBePreempted( + Resources.add(actuallyToPreempt, toPreempt)); + } + + if (Resources.lessThanOrEqual(rc, cluster, + apps[lPriority].toBePreempted, + apps[lPriority].getActuallyToBePreempted())) { + lPriority++; + continue; + } + + if (Resources.equals(apps[hPriority].getToBePreemptFromOther(), + Resources.none())) { + hPriority--; + continue; + } + } + } + + private Resource calculateUsedAMResourcesPerQueue(String partition, + LeafQueue leafQueue, Map perUserAMUsed) { + Collection runningApps = leafQueue.getApplications(); + Resource amUsed = Resources.createResource(0, 0); + + for (FiCaSchedulerApp app : runningApps) { + Resource userAMResource = perUserAMUsed.get(app.getUser()); + if (null == userAMResource) { + userAMResource = Resources.createResource(0, 0); + perUserAMUsed.put(app.getUser(), userAMResource); + } + + Resources.addTo(userAMResource, app.getAMResource(partition)); + Resources.addTo(amUsed, app.getAMResource(partition)); + } + return amUsed; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java new file mode 100644 index 0000000..039b53e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Identifies over utilized resources within a queue and tries to normalize + * them to resolve resource allocation anomalies w.r.t priority and user-limit. + */ +public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { + + @SuppressWarnings("serial") + static class TAPriorityComparator + implements + Serializable, + Comparator { + + @Override + public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) { + Priority p1 = Priority.newInstance(tq1.getPriority()); + Priority p2 = Priority.newInstance(tq2.getPriority()); + + if (!p1.equals(p2)) { + return p1.compareTo(p2); + } + return tq1.getApplicationId().compareTo(tq2.getApplicationId()); + } + } + + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; + final CapacitySchedulerPreemptionContext context; + + private static final Log LOG = + LogFactory.getLog(IntraQueueCandidatesSelector.class); + + IntraQueueCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc, + preemptionContext); + context = preemptionContext; + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed) { + + // 1. Calculate the abnormality within each queue one by one. + computeIntraQueuePreemptionDemand( + clusterResource, totalPreemptedResourceAllowed, selectedCandidates); + + // 2. Previous selectors (with higher priority) could have already + // selected containers. We need to deduct pre-emptable resources + // based on already selected candidates. + CapacitySchedulerPreemptionUtils + .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext, + selectedCandidates); + + // 3. Loop through all partitions to select containers for preemption. + for (String partition : preemptionContext.getAllPartitions()) { + LinkedHashSet queueNames = preemptionContext + .getUnderServedQueuesPerPartition(partition); + + // Error check to handle non-mapped labels to queue. + if (null == queueNames) { + continue; + } + + // 4. Iterate from most under-served queue in order. + for (String queueName : queueNames) { + LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).leafQueue; + + // skip if not a leafqueue + if (null == leafQueue) { + continue; + } + + // 5. Calculate the resource to obtain per partition + Map resToObtainByPartition = fifoPreemptionComputePlugin + .getResourceDemandFromAppsPerQueue(queueName, partition); + + // 6. Based on the selected resource demand per partition, select + // containers with known policy from inter-queue preemption. + synchronized (leafQueue) { + Iterator desc = leafQueue.getOrderingPolicy() + .getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp app = desc.next(); + preemptFromLeastStarvedApp(selectedCandidates, clusterResource, + totalPreemptedResourceAllowed, resToObtainByPartition, + leafQueue, app); + } + } + } + } + + return selectedCandidates; + } + + private void preemptFromLeastStarvedApp( + Map> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed, + Map resToObtainByPartition, LeafQueue leafQueue, + FiCaSchedulerApp app) { + + // ToDo: Reuse reservation selector here. + + List liveContainers = new ArrayList<>( + app.getLiveContainers()); + sortContainers(liveContainers); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "totalPreemptedResourceAllowed for preemption at this round is :" + + totalPreemptedResourceAllowed); + } + + for (RMContainer c : liveContainers) { + + // if there are no demand, return. + if (resToObtainByPartition.isEmpty()) { + return; + } + + // skip preselected containers. + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + continue; + } + + // Skip already marked to killable containers + if (null != preemptionContext.getKillableContainers() && preemptionContext + .getKillableContainers().contains(c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + continue; + } + + // Try to preempt this container + CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( + rc, preemptionContext, resToObtainByPartition, c, clusterResource, + selectedCandidates, totalPreemptedResourceAllowed); + } + + } + + private void computeIntraQueuePreemptionDemand(Resource clusterResource, + Resource totalPreemptedResourceAllowed, + Map> selectedCandidates) { + + // 1. Iterate through all partition to calculate demand within a partition. + for (String partition : context.getAllPartitions()) { + LinkedHashSet queueNames = context + .getUnderServedQueuesPerPartition(partition); + + if (null == queueNames) { + continue; + } + + // 2. Its better to get partition based resource limit earlier before + // starting calculation + Resource partitionBasedResource = + context.getPartitionResource(partition); + + // 3. loop through all queues corresponding to a partition. + for (String queueName : queueNames) { + TempQueuePerPartition tq = context.getQueueByPartition(queueName, + partition); + LeafQueue leafQueue = tq.leafQueue; + + // skip if its parent queue + if (null == leafQueue) { + continue; + } + + // 4. Consider reassignableResource as (used - actuallyToBePreempted). + // This provides as upper limit to split apps quota in a queue. + Resource queueReassignableResource = Resources.subtract(tq.getUsed(), + tq.getActuallyToBePreempted()); + + // 5. Check queue's used capacity. Make sure that the used capacity is + // above certain limit to consider for intra queue preemption. + if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context + .getMinimumThresholdForIntraQueuePreemption()) { + continue; + } + + // 6. compute the allocation of all apps based on queue's unallocated + // capacity + fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource, + partitionBasedResource, tq, selectedCandidates, + totalPreemptedResourceAllowed, + queueReassignableResource, + context.getMaxAllowableLimitForIntraQueuePreemption()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java new file mode 100644 index 0000000..93ebe65 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + + +interface IntraQueuePreemptionComputePlugin { + + Map getResourceDemandFromAppsPerQueue(String queueName, + String partition); + + void computeAppsIdealAllocation(Resource clusterResource, + Resource partitionBasedResource, TempQueuePerPartition tq, + Map> selectedCandidates, + Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned, + float maxAllowablePreemptLimit); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index d1d2485..907785e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.PriorityQueue; import java.util.Set; /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector} */ -public class PreemptableResourceCalculator { +public class PreemptableResourceCalculator + extends + AbstractPreemptableResourceCalculator { private static final Log LOG = LogFactory.getLog(PreemptableResourceCalculator.class); - private final CapacitySchedulerPreemptionContext context; - private final ResourceCalculator rc; private boolean isReservedPreemptionCandidatesSelector; - static class TQComparator implements Comparator { - private ResourceCalculator rc; - private Resource clusterRes; - - TQComparator(ResourceCalculator rc, Resource clusterRes) { - this.rc = rc; - this.clusterRes = clusterRes; - } - - @Override - public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { - if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { - return -1; - } - if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { - return 1; - } - return 0; - } - - // Calculates idealAssigned / guaranteed - // TempQueues with 0 guarantees are always considered the most over - // capacity and therefore considered last for resources. - private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { - double pctOver = Integer.MAX_VALUE; - if (q != null && Resources.greaterThan(rc, clusterRes, - q.getGuaranteed(), - Resources.none())) { - pctOver = Resources.divide(rc, clusterRes, q.idealAssigned, - q.getGuaranteed()); - } - return (pctOver); - } - } - /** * PreemptableResourceCalculator constructor * @@ -93,136 +54,7 @@ public class PreemptableResourceCalculator { public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, boolean isReservedPreemptionCandidatesSelector) { - context = preemptionContext; - rc = preemptionContext.getResourceCalculator(); - this.isReservedPreemptionCandidatesSelector = - isReservedPreemptionCandidatesSelector; - } - - /** - * Computes a normalizedGuaranteed capacity based on active queues - * @param rc resource calculator - * @param clusterResource the total amount of resources in the cluster - * @param queues the list of queues to consider - */ - private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - Collection queues, boolean ignoreGuar) { - Resource activeCap = Resource.newInstance(0, 0); - - if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = 1.0f / queues.size(); - } - } else { - for (TempQueuePerPartition q : queues) { - Resources.addTo(activeCap, q.getGuaranteed()); - } - for (TempQueuePerPartition q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.getGuaranteed(), activeCap); - } - } - } - - // Take the most underserved TempQueue (the one on the head). Collect and - // return the list of all queues that have the same idealAssigned - // percentage of guaranteed. - protected Collection getMostUnderservedQueues( - PriorityQueue orderedByNeed, - TQComparator tqComparator) { - ArrayList underserved = new ArrayList<>(); - while (!orderedByNeed.isEmpty()) { - TempQueuePerPartition q1 = orderedByNeed.remove(); - underserved.add(q1); - TempQueuePerPartition q2 = orderedByNeed.peek(); - // q1's pct of guaranteed won't be larger than q2's. If it's less, then - // return what has already been collected. Otherwise, q1's pct of - // guaranteed == that of q2, so add q2 to underserved list during the - // next pass. - if (q2 == null || tqComparator.compare(q1,q2) < 0) { - return underserved; - } - } - return underserved; - } - - - /** - * Given a set of queues compute the fix-point distribution of unassigned - * resources among them. As pending request of a queue are exhausted, the - * queue is removed from the set and remaining capacity redistributed among - * remaining queues. The distribution is weighted based on guaranteed - * capacity, unless asked to ignoreGuarantee, in which case resources are - * distributed uniformly. - */ - private void computeFixpointAllocation(ResourceCalculator rc, - Resource tot_guarant, Collection qAlloc, - Resource unassigned, boolean ignoreGuarantee) { - // Prior to assigning the unused resources, process each queue as follows: - // If current > guaranteed, idealAssigned = guaranteed + untouchable extra - // Else idealAssigned = current; - // Subtract idealAssigned resources from unassigned. - // If the queue has all of its needs met (that is, if - // idealAssigned >= current + pending), remove it from consideration. - // Sort queues from most under-guaranteed to most over-guaranteed. - TQComparator tqComparator = new TQComparator(rc, tot_guarant); - PriorityQueue orderedByNeed = new PriorityQueue<>(10, - tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { - TempQueuePerPartition q = i.next(); - Resource used = q.getUsed(); - - if (Resources.greaterThan(rc, tot_guarant, used, - q.getGuaranteed())) { - q.idealAssigned = Resources.add( - q.getGuaranteed(), q.untouchableExtra); - } else { - q.idealAssigned = Resources.clone(used); - } - Resources.subtractFrom(unassigned, q.idealAssigned); - // If idealAssigned < (allocated + used + pending), q needs more resources, so - // add it to the list of underserved queues, ordered by need. - Resource curPlusPend = Resources.add(q.getUsed(), q.pending); - if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { - orderedByNeed.add(q); - } - } - - //assign all cluster resources until no more demand, or no resources are left - while (!orderedByNeed.isEmpty() - && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); - - // For each underserved queue (or set of queues if multiple are equally - // underserved), offer its share of the unassigned resources based on its - // normalized guarantee. After the offer, if the queue is not satisfied, - // place it back in the ordered list of queues, recalculating its place - // in the order of most under-guaranteed to most over-guaranteed. In this - // way, the most underserved queue(s) are always given resources first. - Collection underserved = - getMostUnderservedQueues(orderedByNeed, tqComparator); - for (Iterator i = underserved.iterator(); i - .hasNext();) { - TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, - unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant, - isReservedPreemptionCandidatesSelector); - Resource wQdone = Resources.subtract(wQavail, wQidle); - - if (Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - // The queue is still asking for more. Put it back in the priority - // queue, recalculating its order based on need. - orderedByNeed.add(sub); - } - Resources.addTo(wQassigned, wQdone); - } - Resources.subtractFrom(unassigned, wQassigned); - } + super(preemptionContext, isReservedPreemptionCandidatesSelector); } /** @@ -263,14 +95,14 @@ public class PreemptableResourceCalculator { } // first compute the allocation as a fixpoint based on guaranteed capacity - computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned, false); // if any capacity is left unassigned, distributed among zero-guarantee // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) if (!zeroGuarQueues.isEmpty() && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { - computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned, true); } @@ -321,13 +153,12 @@ public class PreemptableResourceCalculator { computeIdealResourceDistribution(rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned); // compute recursively for lower levels and build list of leafs - for(TempQueuePerPartition t : root.getChildren()) { + for (TempQueuePerPartition t : root.getChildren()) { recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); } } } - private void calculateResToObtainByPartitionForLeafQueues( Set leafQueueNames, Resource clusterResource) { // Loop all leaf queues http://git-wip-us.apache.org/repos/asf/hadoop/blob/cef281ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index dd33d8f..b48a287 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -23,6 +23,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import com.google.common.annotations.VisibleForTesting; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +46,7 @@ public abstract class PreemptionCandidatesSelector { * selected candidates. * * @param selectedCandidates already selected candidates from previous policies - * @param clusterResource + * @param clusterResource total resource * @param totalPreemptedResourceAllowed how many resources allowed to be * preempted in this round * @return merged selected candidates. @@ -49,4 +54,26 @@ public abstract class PreemptionCandidatesSelector { public abstract Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed); + + /** + * Compare by reversed priority order first, and then reversed containerId + * order. + * + * @param containers list of containers to sort for. + */ + @VisibleForTesting + static void sortContainers(List containers) { + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + int schedKeyComp = b.getAllocatedSchedulerKey() + .compareTo(a.getAllocatedSchedulerKey()); + if (schedKeyComp != 0) { + return schedKeyComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org