Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0E072CAB6 for ; Tue, 9 Dec 2014 03:31:11 +0000 (UTC) Received: (qmail 57030 invoked by uid 500); 9 Dec 2014 03:30:59 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 56881 invoked by uid 500); 9 Dec 2014 03:30:59 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 54545 invoked by uid 99); 9 Dec 2014 03:30:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Dec 2014 03:30:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E2E01A1F4A0; Tue, 9 Dec 2014 03:30:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Date: Tue, 09 Dec 2014 03:31:21 -0000 Message-Id: <8530a79350f54e67a7ecf2a56331d680@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/41] hadoop git commit: YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b130821 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b130821 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b130821 Branch: refs/heads/YARN-2139 Commit: 4b130821995a3cfe20c71e38e0f63294085c0491 Parents: 3c72f54 Author: Jason Lowe Authored: Fri Dec 5 21:06:48 2014 +0000 Committer: Jason Lowe Committed: Fri Dec 5 21:06:48 2014 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../ProportionalCapacityPreemptionPolicy.java | 170 +++++++++-- ...estProportionalCapacityPreemptionPolicy.java | 283 ++++++++++++++++++- 3 files changed, 424 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9804d61..0b88959 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -126,6 +126,8 @@ Release 2.7.0 - UNRELEASED YARN-2301. Improved yarn container command. (Naganarasimha G R via jianhe) + YARN-2056. Disable preemption at Queue level (Eric Payne via jlowe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 0f48b0c..1a3f804 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.PriorityQueue; import java.util.Set; import org.apache.commons.logging.Log; @@ -111,6 +112,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity."; + public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption"; + // the dispatcher to send preempt and kill events public EventHandler dispatcher; @@ -192,7 +196,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // extract a summary of the queues from scheduler TempQueue tRoot; synchronized (scheduler) { - tRoot = cloneQueues(root, clusterResources); + tRoot = cloneQueues(root, clusterResources, false); } // compute the ideal distribution of resources among queues @@ -370,28 +374,60 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic private void computeFixpointAllocation(ResourceCalculator rc, Resource tot_guarant, Collection qAlloc, Resource unassigned, boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, tot_guarant); + PriorityQueue orderedByNeed = + new PriorityQueue(10,tqComparator); + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue q = i.next(); + if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { + q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(q.current); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (current + pending), q needs more resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.current, q.pending); + if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + //assign all cluster resources until no more demand, or no resources are left - while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, - unassigned, Resources.none())) { + while (!orderedByNeed.isEmpty() + && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { Resource wQassigned = Resource.newInstance(0, 0); - // we compute normalizedGuarantees capacity based on currently active // queues - resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee); - - // offer for each queue their capacity first and in following invocations - // their share of over-capacity - for (Iterator i = qAlloc.iterator(); i.hasNext();) { + resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection underserved = + getMostUnderservedQueues(orderedByNeed, tqComparator); + for (Iterator i = underserved.iterator(); i.hasNext();) { TempQueue sub = i.next(); - Resource wQavail = - Resources.multiply(unassigned, sub.normalizedGuarantee); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, + unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); Resource wQidle = sub.offer(wQavail, rc, tot_guarant); Resource wQdone = Resources.subtract(wQavail, wQidle); - // if the queue returned a value > 0 it means it is fully satisfied - // and it is removed from the list of active queues qAlloc - if (!Resources.greaterThan(rc, tot_guarant, + + if (Resources.greaterThan(rc, tot_guarant, wQdone, Resources.none())) { - i.remove(); + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); } Resources.addTo(wQassigned, wQdone); } @@ -399,6 +435,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } } + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + protected Collection getMostUnderservedQueues( + PriorityQueue orderedByNeed, TQComparator tqComparator) { + ArrayList underserved = new ArrayList(); + while (!orderedByNeed.isEmpty()) { + TempQueue q1 = orderedByNeed.remove(); + underserved.add(q1); + TempQueue q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1,q2) < 0) { + return underserved; + } + } + return underserved; + } + /** * Computes a normalizedGuaranteed capacity based on active queues * @param rc resource calculator @@ -626,9 +683,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * * @param root the root of the CapacityScheduler queue hierarchy * @param clusterResources the total amount of resources in the cluster + * @param parentDisablePreempt true if disable preemption is set for parent * @return the root of the cloned queue hierarchy */ - private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { + private TempQueue cloneQueues(CSQueue root, Resource clusterResources, + boolean parentDisablePreempt) { TempQueue ret; synchronized (root) { String queueName = root.getQueueName(); @@ -639,19 +698,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); + + boolean queueDisablePreemption = false; + String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath() + + SUFFIX_DISABLE_PREEMPTION; + queueDisablePreemption = scheduler.getConfiguration() + .getBoolean(queuePropName, parentDisablePreempt); + + Resource extra = Resource.newInstance(0, 0); + if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) { + extra = Resources.subtract(current, guaranteed); + } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity); - + if (queueDisablePreemption) { + ret.untouchableExtra = extra; + } else { + ret.preemptableExtra = extra; + } ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, maxCapacity); + Resource childrensPreemptable = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { - ret.addChild(cloneQueues(c, clusterResources)); + TempQueue subq = + cloneQueues(c, clusterResources, queueDisablePreemption); + Resources.addTo(childrensPreemptable, subq.preemptableExtra); + ret.addChild(subq); + } + // untouchableExtra = max(extra - childrenPreemptable, 0) + if (Resources.greaterThanOrEqual( + rc, clusterResources, childrensPreemptable, extra)) { + ret.untouchableExtra = Resource.newInstance(0, 0); + } else { + ret.untouchableExtra = + Resources.subtractFrom(extra, childrensPreemptable); } } } @@ -690,6 +776,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; + Resource untouchableExtra; + Resource preemptableExtra; double normalizedGuarantee; @@ -708,6 +796,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic this.toBePreempted = Resource.newInstance(0, 0); this.normalizedGuarantee = Float.NaN; this.children = new ArrayList(); + this.untouchableExtra = Resource.newInstance(0, 0); + this.preemptableExtra = Resource.newInstance(0, 0); } public void setLeafQueue(LeafQueue l){ @@ -761,10 +851,20 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append(" UNTOUCHABLE: ").append(untouchableExtra) + .append(" PREEMPTABLE: ").append(preemptableExtra) .append("\n"); return sb.toString(); } + + public void printAll() { + LOG.info(this.toString()); + for (TempQueue sub : this.getChildren()) { + sub.printAll(); + } + } + public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) { if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { @@ -793,4 +893,38 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } + static class TQComparator implements Comparator { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueue tq1, TempQueue tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueue q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan( + rc, clusterRes, q.guaranteed, Resources.none())) { + pctOver = + Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed); + } + return (pctOver); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 24e70bb..ca67ef0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; @@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -86,6 +90,7 @@ public class TestProportionalCapacityPreemptionPolicy { Clock mClock = null; Configuration conf = null; CapacityScheduler mCS = null; + CapacitySchedulerConfiguration schedConf = null; EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( @@ -98,6 +103,8 @@ public class TestProportionalCapacityPreemptionPolicy { ApplicationId.newInstance(TS, 3), 0); final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appF = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 4), 0); final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); @@ -123,6 +130,8 @@ public class TestProportionalCapacityPreemptionPolicy { mClock = mock(Clock.class); mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); + schedConf = new CapacitySchedulerConfiguration(); + when(mCS.getConfiguration()).thenReturn(schedConf); mDisp = mock(EventHandler.class); rand = new Random(); long seed = rand.nextLong(); @@ -266,6 +275,240 @@ public class TestProportionalCapacityPreemptionPolicy { } @Test + public void testPerQueueDisablePreemption() { + int[][] qData = new int[][]{ + // / A B C + { 100, 55, 25, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap + { 100, 0, 54, 46 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + // appA appB appC + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With PREEMPTION_DISABLED set for queueB, get resources from queueC + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + + // With no PREEMPTION_DISABLED set for queueB, resources will be preempted + // from both queueB and queueC. Test must be reset for so that the mDisp + // event handler will count only events from the following test and not the + // previous one. + setup(); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false); + policy2.editSchedule(); + + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionHierarchical() { + int[][] qData = new int[][] { + // / A D + // B C E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 110, 60, 50, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from queueB (appA), not queueE (appC) despite + // queueE being far over its absolute capacity because queueA (queueB's + // parent) is over capacity and queueD (queueE's parent) is not. + ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId.newInstance( + appA.getApplicationId(), appA.getAttemptId()); + assertTrue("appA should be running on queueB", + mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB)); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB and it's children + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId.newInstance( + appB.getApplicationId(), appB.getAttemptId()); + ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId.newInstance( + appC.getApplicationId(), appC.getAttemptId()); + // Now, all of queueB's (appA) over capacity is not preemptable, so neither + // is queueA's. Verify that capacity is taken from queueE (appC). + assertTrue("appB should be running on queueC", + mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC)); + assertTrue("appC should be running on queueE", + mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE)); + // Resources should have come from queueE (appC) and neither of queueA's + // children. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionBroadHierarchical() { + int[][] qData = new int[][] { + // / A D G + // B C E F H I + {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs + {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap + {1000, 400, 200, 200, 400, 250, 150, 200, 150, 50 }, // used + { 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // queueF(appD) wants resources, Verify that resources come from queueE(appC) + // because it's a sibling and queueB(appA) because queueA is over capacity. + verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Need to call setup() again to reset mDisp + setup(); + // Disable preemption for queueB(appA) + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + // Now that queueB(appA) is not preemptable, verify that resources come + // from queueE(appC) + verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + + setup(); + // Disable preemption for two of the 3 queues with over-capacity. + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true); + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData); + policy3.editSchedule(); + + // Verify that the request was starved out even though queueH(appE) is + // over capacity. This is because queueG (queueH's parent) is NOT + // overcapacity. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI + } + + @Test + public void testPerQueueDisablePreemptionInheritParent() { + int[][] qData = new int[][] { + // / A E + // B C D F G H + {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar) + {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap + {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used + { 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE + { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps + { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity + { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // With all queues preemptable, resources should be taken from queueC(appA) + // and queueD(appB). Resources taken more from queueD(appB) than + // queueC(appA) because it's over its capacity by a larger percentage. + verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // Disable preemption for queueA and it's children. queueF(appC)'s request + // should starve. + setup(); // Call setup() to reset mDisp + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); + policy2.editSchedule(); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH + } + + @Test + public void testPerQueuePreemptionNotAllUntouchable() { + int[][] qData = new int[][] { + // / A E + // B C D F G H + { 2000, 1000, 800, 100, 100, 1000, 500, 300, 200 }, // abs + { 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 }, // maxCap + { 2000, 1300, 300, 800, 200, 700, 500, 0, 200 }, // used + { 300, 0, 0, 0, 0, 300, 0, 300, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 3, 1, 1, 1, 3, 1, 1, 1 }, // apps + { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity + { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues + }; + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // Although queueC(appB) is way over capacity and is untouchable, + // queueD(appC) is preemptable. Request should be filled from queueD(appC). + verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testPerQueueDisablePreemptionRootDisablesAll() { + int[][] qData = new int[][] { + // / A D G + // B C E F H I + {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs + {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap + {1000, 20, 0, 20, 490, 240, 250, 490, 240, 250 }, // used + { 200, 200, 200, 0, 0, 0, 0, 0, 0, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + schedConf.setBoolean(BASE_YARN_RM_PREEMPTION + + "root" + SUFFIX_DISABLE_PREEMPTION, true); + policy.editSchedule(); + // All queues should be non-preemptable, so request should starve. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C @@ -341,7 +584,7 @@ public class TestProportionalCapacityPreemptionPolicy { policy.editSchedule(); // verify capacity taken from A1, not B1 despite B1 being far over // its absolute guaranteed capacity - verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -390,15 +633,17 @@ public class TestProportionalCapacityPreemptionPolicy { @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { - // / A B C D E F G H I - { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs - { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap - { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used - { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending - { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved - { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps - { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity - { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + // / A D G + // B C E F H I + { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap + { 400, 210, 70, 140, 100, 50, 50, 90, 90, 0 }, // used + { 15, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); @@ -407,8 +652,8 @@ public class TestProportionalCapacityPreemptionPolicy { // XXX note: compensating for rounding error in Resources.multiplyTo // which is likely triggered since we use small numbers for readability - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE))); } @Test @@ -629,6 +874,7 @@ public class TestProportionalCapacityPreemptionPolicy { when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + when(root.getQueuePath()).thenReturn("root"); for (int i = 1; i < queues.length; ++i) { final CSQueue q; @@ -644,6 +890,10 @@ public class TestProportionalCapacityPreemptionPolicy { when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + String parentPathName = p.getQueuePath(); + parentPathName = (parentPathName == null) ? "root" : parentPathName; + String queuePathName = (parentPathName+"."+queueName).replace("/","root"); + when(q.getQueuePath()).thenReturn(queuePathName); } assert 0 == pqs.size(); return root; @@ -666,6 +916,8 @@ public class TestProportionalCapacityPreemptionPolicy { LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); + List appAttemptIdList = + new ArrayList(); when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible @@ -683,9 +935,14 @@ public class TestProportionalCapacityPreemptionPolicy { int aPending = pending[i] / apps[i]; int aReserve = reserved[i] / apps[i]; for (int a = 0; a < apps[i]; ++a) { - qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); + FiCaSchedulerApp mockFiCaApp = + mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]); + qApps.add(mockFiCaApp); ++appAlloc; + appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId()); } + when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1))) + .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); if(setAMResourcePercent != 0.0f){