Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 772DF200C09 for ; Wed, 25 Jan 2017 19:51:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 75AA6160B5D; Wed, 25 Jan 2017 18:51:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02A87160B5A for ; Wed, 25 Jan 2017 19:51:25 +0100 (CET) Received: (qmail 58439 invoked by uid 500); 25 Jan 2017 18:51:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 58269 invoked by uid 99); 25 Jan 2017 18:51:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 18:51:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6979ADFD73; Wed, 25 Jan 2017 18:51:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Date: Wed, 25 Jan 2017 18:51:25 -0000 Message-Id: In-Reply-To: <663b6880fdee468ca7a4482563f1eeba@git.apache.org> References: <663b6880fdee468ca7a4482563f1eeba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hadoop git commit: YARN-4752. Improved preemption in FairScheduler. (kasha) archived-at: Wed, 25 Jan 2017 18:51:27 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ceb7149/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 2ef2e76..f6df6a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -17,14 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import org.junit.Assert; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -40,7 +32,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -51,9 +45,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.File; +import java.util.ArrayList; +import java.util.List; + public class FairSchedulerTestBase { public final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); @@ -71,9 +73,14 @@ public class FairSchedulerTestBase { private static final int SLEEP_DURATION = 10; private static final int SLEEP_RETRIES = 1000; + /** + * The list of nodes added to the cluster using the {@link #addNode} method. + */ + protected final List rmNodes = new ArrayList<>(); + // Helper methods public Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); @@ -281,4 +288,18 @@ public class FairSchedulerTestBase { Assert.assertEquals(resource.getVirtualCores(), app.getCurrentConsumption().getVirtualCores()); } + + /** + * Add a node to the cluster and track the nodes in {@link #rmNodes}. + * @param memory memory capacity of the node + * @param cores cpu capacity of the node + */ + protected void addNode(int memory, int cores) { + int id = rmNodes.size() + 1; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id, + "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ceb7149/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java new file mode 100644 index 0000000..25780cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.HashSet; +import java.util.Set; + +public class FairSchedulerWithMockPreemption extends FairScheduler { + @Override + protected void createPreemptionThread() { + preemptionThread = new MockPreemptionThread(this); + } + + static class MockPreemptionThread extends FSPreemptionThread { + private Set appsAdded = new HashSet<>(); + private int totalAppsAdded = 0; + + MockPreemptionThread(FairScheduler scheduler) { + super(scheduler); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + FSAppAttempt app = context.getStarvedApps().take(); + appsAdded.add(app); + totalAppsAdded++; + } catch (InterruptedException e) { + return; + } + } + } + + int uniqueAppsAdded() { + return appsAdded.size(); + } + + int totalAppsAdded() { + return totalAppsAdded; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ceb7149/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 5a170cf..e802f42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -86,11 +86,6 @@ public class FakeSchedulable implements Schedulable { } @Override - public RMContainer preemptContainer() { - return null; - } - - @Override public Resource getFairShare() { return this.fairShare; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ceb7149/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java new file mode 100644 index 0000000..a5b2d86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +/** + * Test class to verify identification of app starvation + */ +public class TestFSAppStarvation extends FairSchedulerTestBase { + + private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES"); + + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) + private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String[] QUEUES = + {"no-preemption", "minshare", "fairshare.child", "drf.child"}; + + private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread; + + @Before + public void setup() { + createConfiguration(); + conf.set(YarnConfiguration.RM_SCHEDULER, + FairSchedulerWithMockPreemption.class.getCanonicalName()); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + } + + @After + public void teardown() { + ALLOC_FILE.delete(); + conf = null; + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + /* + * Test to verify application starvation is computed only when preemption + * is enabled. + */ + @Test + public void testPreemptionDisabled() throws Exception { + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false); + + setupClusterAndSubmitJobs(); + + assertNull("Found starved apps even when preemption is turned off", + scheduler.getContext().getStarvedApps()); + } + + /* + * Test to verify application starvation is computed correctly when + * preemption is turned on. + */ + @Test + public void testPreemptionEnabled() throws Exception { + setupClusterAndSubmitJobs(); + + assertNotNull("FSContext does not have an FSStarvedApps instance", + scheduler.getContext().getStarvedApps()); + assertEquals("Expecting 3 starved applications, one each for the " + + "minshare and fairshare queues", + 3, preemptionThread.uniqueAppsAdded()); + + // Verify the apps get added again on a subsequent update + scheduler.update(); + Thread.yield(); + + verifyLeafQueueStarvation(); + assertTrue("Each app is marked as starved exactly once", + preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); + } + + /* + * Test to verify app starvation is computed only when the cluster + * utilization threshold is over the preemption threshold. + */ + @Test + public void testClusterUtilizationThreshold() throws Exception { + // Set preemption threshold to 1.1, so the utilization is always lower + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f); + + setupClusterAndSubmitJobs(); + + assertNotNull("FSContext does not have an FSStarvedApps instance", + scheduler.getContext().getStarvedApps()); + assertEquals("Found starved apps when preemption threshold is over 100%", 0, + preemptionThread.totalAppsAdded()); + } + + private void verifyLeafQueueStarvation() { + for (String q : QUEUES) { + if (!q.equals("no-preemption")) { + boolean isStarved = + scheduler.getQueueManager().getLeafQueue(q, false).isStarved(); + assertTrue(isStarved); + } + } + } + + private void setupClusterAndSubmitJobs() throws Exception { + setupStarvedCluster(); + submitAppsToEachLeafQueue(); + sendEnoughNodeUpdatesToAssignFully(); + + // Sleep to hit the preemption timeouts + Thread.sleep(10); + + // Scheduler update to populate starved apps + scheduler.update(); + + // Wait for apps to be processed by MockPreemptionThread + Thread.yield(); + } + + /** + * Setup the cluster for starvation testing: + * 1. Create FS allocation file + * 2. Create and start MockRM + * 3. Add two nodes to the cluster + * 4. Submit an app that uses up all resources on the cluster + */ + private void setupStarvedCluster() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + + // Default queue + out.println(""); + out.println(""); + + // Queue with preemption disabled + out.println(""); + out.println("0" + + ""); + out.println(""); + + // Queue with minshare preemption enabled + out.println(""); + out.println("0" + + ""); + out.println("0" + + ""); + out.println("2048mb,2vcores"); + out.println(""); + + // FAIR queue with fairshare preemption enabled + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println("fair"); + addChildQueue(out); + out.println(""); + + // DRF queue with fairshare preemption enabled + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println("drf"); + addChildQueue(out); + out.println(""); + + out.println(""); + out.close(); + + assertTrue("Allocation file does not exist, not running the test", + ALLOC_FILE.exists()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread) + scheduler.preemptionThread; + + // Create and add two nodes to the cluster + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + + // Create an app that takes up all the resources on the cluster + ApplicationAttemptId app + = createSchedulingRequest(1024, 1, "root.default", "default", 8); + + scheduler.update(); + sendEnoughNodeUpdatesToAssignFully(); + + assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); + } + + private void addChildQueue(PrintWriter out) { + // Child queue under fairshare with same settings + out.println(""); + out.println("1" + + ""); + out.println("0" + + ""); + out.println(""); + } + + private void submitAppsToEachLeafQueue() { + for (String queue : QUEUES) { + createSchedulingRequest(1024, 1, "root." + queue, "user", 1); + } + scheduler.update(); + } + + private void sendEnoughNodeUpdatesToAssignFully() { + for (RMNode node : rmNodes) { + NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = + new NodeUpdateSchedulerEvent(node); + for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) { + scheduler.handle(nodeUpdateSchedulerEvent); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ceb7149/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 0a2ce81..98de8db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -106,12 +105,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); + out.println(""); + out.println(""); out.println(""); out.close(); @@ -144,162 +139,6 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { scheduler.update(); Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - FSLeafQueue queueA = - scheduler.getQueueManager().getLeafQueue("queueA", false); - FSLeafQueue queueB = - scheduler.getQueueManager().getLeafQueue("queueB", false); - assertFalse(queueA.isStarvedForMinShare()); - assertTrue(queueB.isStarvedForMinShare()); - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) - assertFalse(queueB.isStarvedForMinShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".2"); - out.println(""); - out.println(""); - out.println(".8"); - out.println(".4"); - out.println(""); - out.println(""); - out.println(""); - out.println(".6"); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 4 * 1024. Node update gives this all to A - createSchedulingRequest(1 * 1024, "queueA", "user1", 4); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 want 3 * 1024 - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); - scheduler.update(); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); - FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); - assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share - // threshold is 1.6 * 1024 - assertFalse(queueB1.isStarvedForFairShare()); - - // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share - // threshold is 2.4 * 1024 - assertTrue(queueB2.isStarvedForFairShare()); - - // Node checks in again - scheduler.handle(nodeEvent2); - scheduler.handle(nodeEvent2); - assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 usages go to 3 * 1024 - assertFalse(queueB1.isStarvedForFairShare()); - assertFalse(queueB2.isStarvedForFairShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShareDRF() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.println(""); - out.println(".5"); - out.println(""); - out.println("1"); - out.println("drf"); - out.println(""); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 7 * 1024, 1. Node update gives this all to A - createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize()); - assertEquals(1, queueA.getResourceUsage().getVirtualCores()); - - // Queue B has 3 reqs : - // 1) 2 * 1024, 5 .. which will be granted - // 2) 1 * 1024, 1 .. which will be granted - // 3) 1 * 1024, 1 .. which wont - createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1); - createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2); - scheduler.update(); - for (int i = 0; i < 3; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false); - assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize()); - assertEquals(6, queueB.getResourceUsage().getVirtualCores()); - - scheduler.update(); - - // Verify that Queue us not starved for fair share.. - // Since the Starvation logic now uses DRF when the policy = drf, The - // Queue should not be starved - assertFalse(queueB.isStarvedForFairShare()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org