Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0A8511642 for ; Tue, 12 Aug 2014 21:43:57 +0000 (UTC) Received: (qmail 16696 invoked by uid 500); 12 Aug 2014 21:43:57 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 16656 invoked by uid 500); 12 Aug 2014 21:43:57 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 16643 invoked by uid 99); 12 Aug 2014 21:43:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Aug 2014 21:43:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Aug 2014 21:43:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D00F12388C38; Tue, 12 Aug 2014 21:43:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1617600 [2/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-yarn-server/ha... Date: Tue, 12 Aug 2014 21:43:28 -0000 To: yarn-commits@hadoop.apache.org From: kasha@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140812214328.D00F12388C38@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java?rev=1617600&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java Tue Aug 12 21:43:27 2014 @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFSAppAttempt extends FairSchedulerTestBase { + + private class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(int seconds) { + time = time + seconds * 1000; + } + + } + + @Before + public void setup() { + Configuration conf = createConfiguration(); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } + + @Test + public void testDelayScheduling() { + FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + double nodeLocalityThreshold = .5; + double rackLocalityThreshold = .6; + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue , + null, rmContext); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // First five scheduling opportunities should remain node local + for (int i = 0; i < 5; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + // After five it should switch to rack local + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // Manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + // Now escalate again to rack-local, then to off-switch + for (int i = 0; i < 5; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + + for (int i = 0; i < 6; i++) { + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + schedulerApp.addSchedulingOpportunity(prio); + assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( + prio, 10, nodeLocalityThreshold, rackLocalityThreshold)); + } + + @Test + public void testDelaySchedulingForContinuousScheduling() + throws InterruptedException { + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue("queue", true); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds + long rackLocalityDelayMs = 6 * 1000L; // 6 seconds + + RMContext rmContext = resourceManager.getRMContext(); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue, + null, rmContext); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 4 seconds should remain node local + clock.tick(4); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // after 6 seconds should switch to rack local + clock.tick(2); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio, clock.getTime()); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + // Now escalate again to rack-local, then to off-switch + clock.tick(6); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + + clock.tick(7); + assertEquals(NodeType.OFF_SWITCH, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime())); + } + + @Test + /** + * Ensure that when negative paramaters are given (signaling delay scheduling + * no tin use), the least restrictive locality level is returned. + */ + public void testLocalityLevelWithoutDelays() { + FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + RMContext rmContext = resourceManager.getRMContext(); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue , + null, rmContext); + assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( + prio, 10, -1.0, -1.0)); + } +} Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1617600&r1=1617599&r2=1617600&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Tue Aug 12 21:43:27 2014 @@ -62,7 +62,7 @@ public class TestFSLeafQueue { @Test public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); schedulable.addAppSchedulable(app); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1617600&r1=1617599&r2=1617600&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Aug 12 21:43:27 2014 @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -82,13 +81,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -1539,7 +1536,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1613,9 +1610,9 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1688,8 +1685,8 @@ public class TestFairScheduler extends F "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1731,7 +1728,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1766,7 +1763,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(0, 1, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1830,10 +1827,10 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1952,7 +1949,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -2025,7 +2022,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2066,7 +2063,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2101,7 +2098,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -2143,7 +2140,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2165,10 +2162,10 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2208,13 +2205,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2247,19 +2244,19 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2341,7 +2338,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2353,14 +2350,14 @@ public class TestFairScheduler extends F } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); - Collection runnableApps = + Collection runnableApps = queue.getRunnableAppSchedulables(); - Collection nonRunnableApps = + Collection nonRunnableApps = queue.getNonRunnableAppSchedulables(); - assertEquals(runnable, runnableApps.contains(app.getAppSchedulable())); - assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable())); + assertEquals(runnable, runnableApps.contains(app)); + assertEquals(!runnable, nonRunnableApps.contains(app)); } private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, @@ -2465,7 +2462,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 1024 MB memory", @@ -2479,7 +2476,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 1024 MB memory", @@ -2493,7 +2490,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId3 = createAppAttemptId(3, 1); createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application3's AM requests 1024 MB memory", @@ -2529,7 +2526,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createAppAttemptId(4, 1); createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application4's AM requests 2048 MB memory", @@ -2543,7 +2540,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId5 = createAppAttemptId(5, 1); createApplicationWithAMResource(attId5, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5); - FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application5's AM requests 2048 MB memory", @@ -2586,7 +2583,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); - FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", @@ -2677,7 +2674,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 2048 MB memory", @@ -2691,7 +2688,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 2048 MB memory", @@ -2823,7 +2820,7 @@ public class TestFairScheduler extends F // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -3007,7 +3004,7 @@ public class TestFairScheduler extends F assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -3063,7 +3060,7 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), @@ -3171,12 +3168,10 @@ public class TestFairScheduler extends F assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); scheduler.moveApplication(appId, "queue2"); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertSame(targetQueue, app.getQueue()); - assertFalse(oldQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); @@ -3224,17 +3219,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId = createSchedulingRequest(1024, 1, "queue1", "user1", 3); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); - assertTrue(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); + assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app)); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); - assertFalse(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertFalse(targetQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app)); + assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java?rev=1617600&r1=1617599&r2=1617600&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java Tue Aug 12 21:43:27 2014 @@ -42,12 +42,13 @@ public class TestMaxRunningAppsEnforcer private int appNum; private TestFairScheduler.MockClock clock; private RMContext rmContext; + private FairScheduler scheduler; @Before public void setup() throws Exception { Configuration conf = new Configuration(); clock = new TestFairScheduler.MockClock(); - FairScheduler scheduler = mock(FairScheduler.class); + scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); @@ -65,11 +66,11 @@ public class TestMaxRunningAppsEnforcer when(rmContext.getEpoch()).thenReturn(0); } - private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + private FSAppAttempt addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, + FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null, rmContext); queue.addApp(app, runnable); if (runnable) { @@ -80,7 +81,7 @@ public class TestMaxRunningAppsEnforcer return app; } - private void removeApp(FSSchedulerApp app) { + private void removeApp(FSAppAttempt app) { app.getQueue().removeApp(app); maxAppsEnforcer.untrackRunnableApp(app); maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); @@ -93,7 +94,7 @@ public class TestMaxRunningAppsEnforcer queueMaxApps.put("root", 2); queueMaxApps.put("root.queue1", 1); queueMaxApps.put("root.queue2", 1); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -110,7 +111,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -128,7 +129,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); queueMaxApps.put("root.queue1.leaf1", 2); userMaxApps.put("user1", 1); - FSSchedulerApp app1 = addApp(leaf1, "user1"); + FSAppAttempt app1 = addApp(leaf1, "user1"); addApp(leaf1, "user2"); addApp(leaf1, "user3"); addApp(leaf2, "user1"); @@ -147,7 +148,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); clock.tick(20); @@ -167,7 +168,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); @@ -182,21 +183,18 @@ public class TestMaxRunningAppsEnforcer @Test public void testMultiListStartTimeIteratorEmptyAppLists() { - List> lists = new ArrayList>(); - lists.add(Arrays.asList(mockAppSched(1))); - lists.add(Arrays.asList(mockAppSched(2))); - Iterator iter = + List> lists = new ArrayList>(); + lists.add(Arrays.asList(mockAppAttempt(1))); + lists.add(Arrays.asList(mockAppAttempt(2))); + Iterator iter = new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists); - assertEquals(1, iter.next().getAppSchedulable().getStartTime()); - assertEquals(2, iter.next().getAppSchedulable().getStartTime()); + assertEquals(1, iter.next().getStartTime()); + assertEquals(2, iter.next().getStartTime()); } - private AppSchedulable mockAppSched(long startTime) { - AppSchedulable appSched = mock(AppSchedulable.class); - when(appSched.getStartTime()).thenReturn(startTime); - FSSchedulerApp schedApp = mock(FSSchedulerApp.class); - when(schedApp.getAppSchedulable()).thenReturn(appSched); - when(appSched.getApp()).thenReturn(schedApp); - return appSched; + private FSAppAttempt mockAppAttempt(long startTime) { + FSAppAttempt schedApp = mock(FSAppAttempt.class); + when(schedApp.getStartTime()).thenReturn(startTime); + return schedApp; } }