Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2695D18E01 for ; Tue, 23 Feb 2016 10:55:35 +0000 (UTC) Received: (qmail 86297 invoked by uid 500); 23 Feb 2016 10:55:12 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 85958 invoked by uid 500); 23 Feb 2016 10:55:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 85700 invoked by uid 99); 23 Feb 2016 10:55:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 10:55:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B083E10C8; Tue, 23 Feb 2016 10:55:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ozawa@apache.org To: common-commits@hadoop.apache.org Date: Tue, 23 Feb 2016 10:55:13 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-4648. Move preemption related tests from TestFairScheduler to TestFairSchedulerPreemption. Contributed by Kai Sasaki. YARN-4648. Move preemption related tests from TestFairScheduler to TestFairSchedulerPreemption. Contributed by Kai Sasaki. (cherry picked from commit 0e12114c9ccb0c6b16c258227a433b075418796e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ee55d03 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ee55d03 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ee55d03 Branch: refs/heads/branch-2 Commit: 4ee55d0322f2e97d447acdd5c7bf68cc7b7ac13a Parents: 56b82de Author: Tsuyoshi Ozawa Authored: Tue Feb 23 19:50:08 2016 +0900 Committer: Tsuyoshi Ozawa Committed: Tue Feb 23 19:50:40 2016 +0900 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/TestFairScheduler.java | 1204 ---------------- .../fair/TestFairSchedulerPreemption.java | 1295 +++++++++++++++++- 3 files changed, 1293 insertions(+), 1209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee55d03/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 37deb3c..14ecd2b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED YARN-4708. Missing default mapper type in TimelineServer performance test tool usage. (Kai Sasaki via ozawa) + YARN-4648. Move preemption related tests from TestFairScheduler to + TestFairSchedulerPreemption. (Kai Sasaki via ozawa) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee55d03/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 8b5263c..a7d2753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -94,11 +93,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -1793,1122 +1790,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01); } - @Test (timeout = 5000) - /** - * Make sure containers are chosen to be preempted in the correct order. - */ - public void testChoiceOfPreemptedContainers() throws Exception { - conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); - conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); - - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Create two nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - // Queue A and B each request two applications - ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1); - createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); - ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3); - createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2); - - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1); - createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3); - ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3); - createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4); - - scheduler.update(); - - scheduler.getQueueManager().getLeafQueue("queueA", true) - .setPolicy(SchedulingPolicy.parse("fifo")); - scheduler.getQueueManager().getLeafQueue("queueB", true) - .setPolicy(SchedulingPolicy.parse("fair")); - - // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - for (int i = 0; i < 4; i++) { - scheduler.handle(nodeUpdate1); - scheduler.handle(nodeUpdate2); - } - - assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - - // Now new requests arrive from queueC and default - createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); - scheduler.update(); - - // We should be able to claw back one container from queueA and queueB each. - scheduler.preemptResources(Resources.createResource(2 * 1024)); - assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - - // First verify we are adding containers to preemption list for the app. - // For queueA (fifo), app2 is selected. - // For queueB (fair), app4 is selected. - assertTrue("App2 should have container to be preempted", - !Collections.disjoint( - scheduler.getSchedulerApp(app2).getLiveContainers(), - scheduler.getSchedulerApp(app2).getPreemptionContainers())); - assertTrue("App4 should have container to be preempted", - !Collections.disjoint( - scheduler.getSchedulerApp(app2).getLiveContainers(), - scheduler.getSchedulerApp(app2).getPreemptionContainers())); - - // Pretend 15 seconds have passed - clock.tickSec(15); - - // Trigger a kill by insisting we want containers back - scheduler.preemptResources(Resources.createResource(2 * 1024)); - - // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - // Inside each app, containers are sorted according to their priorities. - // Containers with priority 4 are preempted for app2 and app4. - Set set = new HashSet(); - for (RMContainer container : - scheduler.getSchedulerApp(app2).getLiveContainers()) { - if (container.getAllocatedPriority().getPriority() == 4) { - set.add(container); - } - } - for (RMContainer container : - scheduler.getSchedulerApp(app4).getLiveContainers()) { - if (container.getAllocatedPriority().getPriority() == 4) { - set.add(container); - } - } - assertTrue("Containers with priority=4 in app2 and app4 should be " + - "preempted.", set.isEmpty()); - - // Trigger a kill by insisting we want containers back - scheduler.preemptResources(Resources.createResource(2 * 1024)); - - // Pretend 15 seconds have passed - clock.tickSec(15); - - // We should be able to claw back another container from A and B each. - // For queueA (fifo), continue preempting from app2. - // For queueB (fair), even app4 has a lowest priority container with p=4, it - // still preempts from app3 as app3 is most over fair share. - scheduler.preemptResources(Resources.createResource(2 * 1024)); - - assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - - // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(Resources.createResource(2 * 1024)); - assertTrue("App1 should have no container to be preempted", - scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); - assertTrue("App2 should have no container to be preempted", - scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); - assertTrue("App3 should have no container to be preempted", - scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); - assertTrue("App4 should have no container to be preempted", - scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); - } - - @Test - public void testPreemptionIsNotDelayedToNextRound() throws Exception { - conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); - - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("8"); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println("2"); - out.println(""); - out.println("10"); - out.println(".5"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add a node of 8G - RMNode node1 = MockNodes.newNodeInfo(1, - Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Run apps in queueA.A1 and queueB - ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1, - "queueA.queueA1", "user1", 7, 1); - // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); - ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB", - "user2", 1, 1); - - scheduler.update(); - - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 8; i++) { - scheduler.handle(nodeUpdate1); - } - - // verify if the apps got the containers they requested - assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - - // Now submit an app in queueA.queueA2 - ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1, - "queueA.queueA2", "user3", 7, 1); - scheduler.update(); - - // Let 11 sec pass - clock.tickSec(11); - - scheduler.update(); - Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() - .getLeafQueue("queueA.queueA2", false), clock.getTime()); - assertEquals(3277, toPreempt.getMemory()); - - // verify if the 3 containers required by queueA2 are preempted in the same - // round - scheduler.preemptResources(toPreempt); - assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() - .size()); - } - - @Test (timeout = 5000) - /** - * Tests the timing of decision to preempt tasks. - */ - public void testPreemptionDecision() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("0mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println("5"); - out.println("10"); - out.println(".5"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Create four nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - // Queue A and B each request three containers - ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); - ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); - - ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); - - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - } - - // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); - - scheduler.update(); - - FSLeafQueue schedC = - scheduler.getQueueManager().getLeafQueue("queueC", true); - FSLeafQueue schedD = - scheduler.getQueueManager().getLeafQueue("queueD", true); - - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); - // After minSharePreemptionTime has passed, they should want to preempt min - // share. - clock.tickSec(6); - assertEquals( - 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); - - // After fairSharePreemptionTime has passed, they should want to preempt - // fair share. - scheduler.update(); - clock.tickSec(6); - assertEquals( - 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); - } - - @Test -/** - * Tests the timing of decision to preempt tasks. - */ - public void testPreemptionDecisionWithDRF() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("0mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,1vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,2vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,3vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,2vcores"); - out.println(""); - out.println("5"); - out.println("10"); - out.println(".5"); - out.println("drf"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Create four nodes - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - // Queue A and B each request three containers - ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); - ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); - - ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); - - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - } - - // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3); - - scheduler.update(); - - FSLeafQueue schedC = - scheduler.getQueueManager().getLeafQueue("queueC", true); - FSLeafQueue schedD = - scheduler.getQueueManager().getLeafQueue("queueD", true); - - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); - - // Test : - // 1) whether componentWise min works as expected. - // 2) DRF calculator is used - - // After minSharePreemptionTime has passed, they should want to preempt min - // share. - clock.tickSec(6); - Resource res = scheduler.resourceDeficit(schedC, clock.getTime()); - assertEquals(1024, res.getMemory()); - // Demand = 3 - assertEquals(3, res.getVirtualCores()); - - res = scheduler.resourceDeficit(schedD, clock.getTime()); - assertEquals(1024, res.getMemory()); - // Demand = 6, but min share = 2 - assertEquals(2, res.getVirtualCores()); - - // After fairSharePreemptionTime has passed, they should want to preempt - // fair share. - scheduler.update(); - clock.tickSec(6); - res = scheduler.resourceDeficit(schedC, clock.getTime()); - assertEquals(1536, res.getMemory()); - assertEquals(3, res.getVirtualCores()); - - res = scheduler.resourceDeficit(schedD, clock.getTime()); - assertEquals(1536, res.getMemory()); - // Demand = 6, but fair share = 3 - assertEquals(3, res.getVirtualCores()); - } - - @Test - /** - * Tests the various timing of decision to preempt tasks. - */ - public void testPreemptionDecisionWithVariousTimeout() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("0mb,0vcores"); - out.println(""); - out.println(""); - out.println("1"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println("2"); - out.println("10"); - out.println("25"); - out.println(""); - out.println("1024mb,0vcores"); - out.println("5"); - out.println(""); - out.println(""); - out.println("1024mb,0vcores"); - out.println("20"); - out.println(""); - out.println(""); - out.println(""); - out.println("1"); - out.println("1024mb,0vcores"); - out.println(""); - out.print("15"); - out.print("30"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Check the min/fair share preemption timeout for each queue - QueueManager queueMgr = scheduler.getQueueManager(); - assertEquals(30000, queueMgr.getQueue("root") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("default") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueA") - .getFairSharePreemptionTimeout()); - assertEquals(25000, queueMgr.getQueue("queueB") - .getFairSharePreemptionTimeout()); - assertEquals(25000, queueMgr.getQueue("queueB.queueB1") - .getFairSharePreemptionTimeout()); - assertEquals(20000, queueMgr.getQueue("queueB.queueB2") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueC") - .getFairSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("root") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("default") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueA") - .getMinSharePreemptionTimeout()); - assertEquals(10000, queueMgr.getQueue("queueB") - .getMinSharePreemptionTimeout()); - assertEquals(5000, queueMgr.getQueue("queueB.queueB1") - .getMinSharePreemptionTimeout()); - assertEquals(10000, queueMgr.getQueue("queueB.queueB2") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueC") - .getMinSharePreemptionTimeout()); - - // Create one big node - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A takes all resources - for (int i = 0; i < 6; i ++) { - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); - } - - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 6; i++) { - scheduler.handle(nodeUpdate1); - } - - // Now new requests arrive from queues B1, B2 and C - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1); - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2); - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3); - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - scheduler.update(); - - FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true); - FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true); - FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); - - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime()))); - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime()))); - assertTrue(Resources.equals( - Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime()))); - - // After 5 seconds, queueB1 wants to preempt min share - scheduler.update(); - clock.tickSec(6); - assertEquals( - 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - - // After 10 seconds, queueB2 wants to preempt min share - scheduler.update(); - clock.tickSec(5); - assertEquals( - 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - - // After 15 seconds, queueC wants to preempt min share - scheduler.update(); - clock.tickSec(5); - assertEquals( - 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - - // After 20 seconds, queueB2 should want to preempt fair share - scheduler.update(); - clock.tickSec(5); - assertEquals( - 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - - // After 25 seconds, queueB1 should want to preempt fair share - scheduler.update(); - clock.tickSec(5); - assertEquals( - 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - - // After 30 seconds, queueC should want to preempt fair share - scheduler.update(); - clock.tickSec(5); - assertEquals( - 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); - assertEquals( - 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); - } - - @Test - /** - * Tests the decision to preempt tasks respect to non-preemptable queues - * 1, Queues as follow: - * queueA(non-preemptable) - * queueB(preemptable) - * parentQueue(non-preemptable) - * --queueC(preemptable) - * queueD(preemptable) - * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare - * 3, Now all resource are occupied - * 4, Submit request to queueD, and need to preempt resource from other queues - * 5, Only preemptable queue(queueB) would be preempted. - */ - public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("0mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println("false"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println("false"); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println(""); - out.println(".25"); - out.println("2048mb,0vcores"); - out.println(""); - out.println("5"); - out.println("10"); - out.println(".5"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Create four nodes(3G each) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - RMNode node4 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, - "127.0.0.4"); - NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); - scheduler.handle(nodeEvent4); - - // Submit apps to queueA, queueB, queueC, - // now all resource of the cluster is occupied - ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); - ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3); - - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - - NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); - scheduler.handle(nodeUpdate4); - } - - assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - - // Now new requests arrive from queues D - ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1); - scheduler.update(); - FSLeafQueue schedD = - scheduler.getQueueManager().getLeafQueue("queueD", true); - - // After minSharePreemptionTime has passed, 2G resource should preempted from - // queueB to queueD - clock.tickSec(6); - assertEquals(2048, - scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); - - scheduler.preemptResources(Resources.createResource(2 * 1024)); - // now only app2 is selected to be preempted - assertTrue("App2 should have container to be preempted", - !Collections.disjoint( - scheduler.getSchedulerApp(app2).getLiveContainers(), - scheduler.getSchedulerApp(app2).getPreemptionContainers())); - assertTrue("App1 should not have container to be preempted", - Collections.disjoint( - scheduler.getSchedulerApp(app1).getLiveContainers(), - scheduler.getSchedulerApp(app1).getPreemptionContainers())); - assertTrue("App3 should not have container to be preempted", - Collections.disjoint( - scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - // Pretend 20 seconds have passed - clock.tickSec(20); - scheduler.preemptResources(Resources.createResource(2 * 1024)); - for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - - NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); - scheduler.handle(nodeUpdate4); - } - // after preemption - assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - } - - @Test - /** - * Tests the decision to preempt tasks when allowPreemptionFrom is set false on - * all queues. - * Then none of them would be preempted actually. - * 1, Queues as follow: - * queueA(non-preemptable) - * queueB(non-preemptable) - * parentQueue(non-preemptable) - * --queueC(preemptable) - * parentQueue(preemptable) - * --queueD(non-preemptable) - * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare - * 3, Now all resource are occupied - * 4, Submit request to queueA, and need to preempt resource from other queues - * 5, None of queues would be preempted. - */ - public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues() - throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("0mb,0vcores"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("2048mb,0vcores"); - out.println("false"); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println("false"); - out.println(""); - out.println(""); - out.println("false"); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(".25"); - out.println("1024mb,0vcores"); - out.println("false"); - out.println(""); - out.println(""); - out.println("5"); - out.println("10"); - out.println(".5"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Create four nodes(3G each) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, - "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - scheduler.handle(nodeEvent2); - - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - RMNode node4 = - MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, - "127.0.0.4"); - NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); - scheduler.handle(nodeEvent4); - - // Submit apps to queueB, queueC, queueD - // now all resource of the cluster is occupied - - ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1); - ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3); - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - - NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); - scheduler.handle(nodeUpdate4); - } - - assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - - // Now new requests arrive from queues A - ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); - scheduler.update(); - FSLeafQueue schedA = - scheduler.getQueueManager().getLeafQueue("queueA", true); - - // After minSharePreemptionTime has passed, resource deficit is 2G - clock.tickSec(6); - assertEquals(2048, - scheduler.resourceDeficit(schedA, clock.getTime()).getMemory()); - - scheduler.preemptResources(Resources.createResource(2 * 1024)); - // now none app is selected to be preempted - assertTrue("App1 should have container to be preempted", - Collections.disjoint( - scheduler.getSchedulerApp(app1).getLiveContainers(), - scheduler.getSchedulerApp(app1).getPreemptionContainers())); - assertTrue("App2 should not have container to be preempted", - Collections.disjoint( - scheduler.getSchedulerApp(app2).getLiveContainers(), - scheduler.getSchedulerApp(app2).getPreemptionContainers())); - assertTrue("App3 should not have container to be preempted", - Collections.disjoint( - scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - // Pretend 20 seconds have passed - clock.tickSec(20); - scheduler.preemptResources(Resources.createResource(2 * 1024)); - for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); - scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); - - NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); - scheduler.handle(nodeUpdate4); - } - // after preemption - assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - } - - @Test - public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println("5"); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.print("15"); - out.print("30"); - out.print("40"); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Check the min/fair share preemption timeout for each queue - QueueManager queueMgr = scheduler.getQueueManager(); - assertEquals(30000, queueMgr.getQueue("root") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("default") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueA") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueB") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueB.queueB1") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueB.queueB2") - .getFairSharePreemptionTimeout()); - assertEquals(30000, queueMgr.getQueue("queueC") - .getFairSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("root") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("default") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueA") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueB") - .getMinSharePreemptionTimeout()); - assertEquals(5000, queueMgr.getQueue("queueB.queueB1") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueB.queueB2") - .getMinSharePreemptionTimeout()); - assertEquals(15000, queueMgr.getQueue("queueC") - .getMinSharePreemptionTimeout()); - - // If both exist, we take the default one - out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println("5"); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.print("15"); - out.print("25"); - out.print("30"); - out.println(""); - out.close(); - - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - assertEquals(25000, queueMgr.getQueue("root") - .getFairSharePreemptionTimeout()); - } - @Test(timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf); @@ -5062,91 +3943,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } } - - @Test(timeout = 5000) - public void testRecoverRequestAfterPreemption() throws Exception { - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); - - ControlledClock clock = new ControlledClock(); - scheduler.setClock(clock); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - Priority priority = Priority.newInstance(20); - String host = "127.0.0.1"; - int GB = 1024; - - // Create Node and raised Node Added event - RMNode node = MockNodes.newNodeInfo(1, - Resources.createResource(16 * 1024, 4), 0, host); - NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - scheduler.handle(nodeEvent); - - // Create 3 container requests and place it in ask - List ask = new ArrayList(); - ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, - priority.getPriority(), 1, true); - ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, - node.getRackName(), priority.getPriority(), 1, true); - ResourceRequest offRackRequest = createResourceRequest(GB, 1, - ResourceRequest.ANY, priority.getPriority(), 1, true); - ask.add(nodeLocalRequest); - ask.add(rackLocalRequest); - ask.add(offRackRequest); - - // Create Request and update - ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", - "user1", ask); - scheduler.update(); - - // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); - scheduler.handle(nodeUpdate); - - assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() - .size()); - SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); - - // ResourceRequest will be empty once NodeUpdate is completed - Assert.assertNull(app.getResourceRequest(priority, host)); - - ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); - RMContainer rmContainer = app.getRMContainer(containerId1); - - // Create a preempt event and register for preemption - scheduler.warnOrKillContainer(rmContainer); - - // Wait for few clock ticks - clock.tickSec(5); - - // preempt now - scheduler.warnOrKillContainer(rmContainer); - - // Trigger container rescheduled event - scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); - - List requests = rmContainer.getResourceRequests(); - // Once recovered, resource request will be present again in app - Assert.assertEquals(3, requests.size()); - for (ResourceRequest request : requests) { - Assert.assertEquals(1, - app.getResourceRequest(priority, request.getResourceName()) - .getNumContainers()); - } - - // Send node heartbeat - scheduler.update(); - scheduler.handle(nodeUpdate); - - List containers = scheduler.allocate(appAttemptId, - Collections. emptyList(), - Collections. emptyList(), null, null, null, null).getContainers(); - - // Now with updated ResourceRequest, a container is allocated for AM. - Assert.assertTrue(containers.size() == 1); - } @Test public void testBlacklistNodes() throws Exception {