Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D7C5200D42 for ; Fri, 17 Nov 2017 16:48:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2C2D5160C0A; Fri, 17 Nov 2017 15:48:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7CDEF160BF8 for ; Fri, 17 Nov 2017 16:48:27 +0100 (CET) Received: (qmail 82561 invoked by uid 500); 17 Nov 2017 15:48:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82546 invoked by uid 99); 17 Nov 2017 15:48:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Nov 2017 15:48:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F562E0D51; Fri, 17 Nov 2017 15:48:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: haibochen@apache.org To: common-commits@hadoop.apache.org Date: Fri, 17 Nov 2017 15:48:26 -0000 Message-Id: <510bf59b0a174abf9c148f228742ff73@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: YARN-1015. FS should watch node resource utilization and allocate opportunistic containers if appropriate. archived-at: Fri, 17 Nov 2017 15:48:29 -0000 Repository: hadoop Updated Branches: refs/heads/YARN-1011 a4cfabf28 -> 561410c78 http://git-wip-us.apache.org/repos/asf/hadoop/blob/561410c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 42d4f81..e70053c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,13 +55,19 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -71,6 +77,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -92,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -1054,15 +1063,15 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals( FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(2, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // verify metrics QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1") @@ -1097,7 +1106,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1107,7 +1116,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); // Now another node checks in with capacity @@ -1121,7 +1130,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure this goes to queue 2 assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // The old reservation should still be there... assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize()); @@ -1131,7 +1140,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { } - @Test (timeout = 5000) + @Test public void testOffSwitchAppReservationThreshold() throws Exception { conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f); scheduler.init(conf); @@ -1171,7 +1180,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1203,7 +1212,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1264,7 +1273,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Create new app with a resource request that can be satisfied by any // node but would be @@ -1309,7 +1318,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -1353,7 +1362,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Verify number of reservations have decremented assertEquals(0, @@ -1397,7 +1406,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1406,7 +1415,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1532,7 +1541,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests likewise createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1541,7 +1550,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); @@ -1581,12 +1590,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure allocated memory of queue1 doesn't exceed its maximum assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //the reservation of queue1 should be reclaim assertEquals(0, scheduler.getSchedulerApp(attId1). getCurrentReservation().getMemorySize()); assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); } @Test @@ -1626,7 +1635,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 1 is allocated app capacity assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // Now queue 2 requests below threshold ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -1635,7 +1644,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 has no reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(0, scheduler.getSchedulerApp(attId).getReservedContainers().size()); @@ -1646,7 +1655,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() .getVirtualCores()); @@ -1661,7 +1670,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure this goes to queue 2 assertEquals(3, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getVirtualCores()); + getGuaranteedResourceUsage().getVirtualCores()); // The old reservation should still be there... assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() @@ -2694,7 +2703,361 @@ public class TestFairScheduler extends FairSchedulerTestBase { 2, liveContainers.iterator().next().getContainer(). getPriority().getPriority()); } - + + /** + * Test that NO OPPORTUNISTIC containers can be allocated on a node that + * is fully allocated and with a very high utilization. + */ + @Test + public void testAllocateNoOpportunisticContainersOnBusyNode() + throws IOException { + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(2048, 2), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the node's full memory + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization shoots up after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(2000, 0, 0.8f)); + + // create another scheduling request + ApplicationAttemptId appAttempt2 + = createSchedulingRequest(100, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue("Expecting no containers allocated", + allocatedContainers2.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that a reservation is made for the second resource request + Resource reserved = scheduler.getNode(node.getNodeID()). + getReservedContainer().getReservedResource(); + assertTrue("Expect a reservation made for the second resource request", + reserved.equals(Resource.newInstance(100, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test that OPPORTUNISTIC containers can be allocated on a node with low + * utilization even though there is not enough unallocated resource on the + * node to accommodate the request. + */ + @Test + public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3600, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node that is fully + * allocated but whose utilization is very low. + */ + @Test + public void testAllocateOpportunisticContainersOnFullyAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the whole node + ApplicationAttemptId appAttempt1 = createSchedulingRequest( + 4096, "queue1", "user1", 4); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(1800, 0, 0.5f)); + + // create another scheduling request now that there is no unallocated + // resources left on the node, the request should be served with an + // allocation of an opportunistic container + ApplicationAttemptId appAttempt2 = createSchedulingRequest( + 1024, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** + * Test opportunistic containers can be allocated on a node with a low + * utilization even though there are GUARANTEED containers allocated. + */ + @Test + public void testAllocateOpportunisticContainersWithGuaranteedOnes() + throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(3200, "queue1", "user1", 3); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3200, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // node utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersAndNodeUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(512, 0, 0.1f)); + + // create two other scheduling requests which in aggregate ask for more + // that what's left unallocated on the node. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(512, "queue2", "user1", 1); + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, "queue3", "user1", 1); + + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(512, scheduler.getQueueManager().getQueue("queue2"). + getGuaranteedResourceUsage().getMemorySize()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers2.get(0).getExecutionType()); + + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers3.get(0).getExecutionType()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getMemorySize()); + + // verify that no reservation is made given that the second request should + // be satisfied by a GUARANTEED container allocation, the third by an + // OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made.", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + @Test public void testAclSubmitApplication() throws Exception { // Set acl's @@ -3684,7 +4047,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { .createAbnormalContainerStatus(container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.FINISHED); - assertEquals(Resources.none(), app1.getResourceUsage()); + assertEquals(Resources.none(), app1.getGuaranteedResourceUsage()); } @Test @@ -3784,7 +4147,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application1's AM should be finished", 0, app1.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app1.getResourceUsage()); + Resources.none(), app1.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); assertEquals("Application3's AM requests 1024 MB memory", @@ -3804,7 +4167,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app4.getResourceUsage()); + Resources.none(), app4.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3820,7 +4183,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3833,7 +4196,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); @@ -3849,11 +4212,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application2's AM should be finished", 0, app2.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app2.getResourceUsage()); + Resources.none(), app2.getGuaranteedResourceUsage()); assertEquals("Application3's AM should be finished", 0, app3.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app3.getResourceUsage()); + Resources.none(), app3.getGuaranteedResourceUsage()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); assertEquals("Application5's AM requests 2048 MB memory", @@ -3874,7 +4237,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application5's AM should have 0 container", 0, app5.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app5.getResourceUsage()); + Resources.none(), app5.getGuaranteedResourceUsage()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemorySize()); scheduler.update(); @@ -3898,7 +4261,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); assertEquals("Finished application usage should be none", - Resources.none(), app6.getResourceUsage()); + Resources.none(), app6.getGuaranteedResourceUsage()); assertEquals("Application6's AM resource shouldn't be updated", 0, app6.getAMResource().getMemorySize()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -4614,17 +4977,25 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true); FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2"); - Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB); - Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0); - Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0); + Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(), + 1 * GB); + Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(), + 0); + Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(), + 0); } @Test (expected = YarnException.class) @@ -4664,7 +5035,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); scheduler.handle(updateEvent); - assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(2048, 2), + oldQueue.getGuaranteedResourceUsage()); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } @@ -5088,7 +5460,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(new NodeUpdateSchedulerEvent(node2)); assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); //container will be reserved at node1 RMContainer reservedContainer1 = @@ -5108,7 +5480,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { app1, RMAppAttemptState.KILLED, false)); assertEquals(0, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + getGuaranteedResourceUsage().getMemorySize()); // container will be allocated at node2 scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -5256,10 +5628,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app1 = mock(FSAppAttempt.class); Mockito.when(app1.getDemand()).thenReturn(maxResource); - Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app1.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); FSAppAttempt app2 = mock(FSAppAttempt.class); Mockito.when(app2.getDemand()).thenReturn(maxResource); - Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app2.getGuaranteedResourceUsage()). + thenReturn(Resources.none()); QueueManager queueManager = scheduler.getQueueManager(); FSParentQueue queue1 = queueManager.getParentQueue("queue1", true); @@ -5315,7 +5689,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { child1.setMaxShare(new ConfigurableResource(resource)); FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(resource); - Mockito.when(app.getResourceUsage()).thenReturn(resource); + Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource); child1.addApp(app, true); child1.updateDemand(); @@ -5351,7 +5725,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { + " SteadyFairShare: ," + " MaxShare: ," + " MinShare: ," - + " ResourceUsage: ," + + " Guaranteed ResourceUsage: ," + " Demand: ," + " MaxAMShare: 0.5," + " Runnable: 0}"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/561410c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b..6777b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -243,11 +243,16 @@ public class TestSchedulingPolicy { } @Override - public Resource getResourceUsage() { + public Resource getGuaranteedResourceUsage() { return usage; } @Override + public Resource getOpportunisticResourceUsage() { + return Resource.newInstance(0, 0); + } + + @Override public Resource getMinShare() { return minShare; } @@ -278,7 +283,8 @@ public class TestSchedulingPolicy { } @Override - public Resource assignContainer(FSSchedulerNode node) { + public Resource assignContainer(FSSchedulerNode node, + boolean opportunistic) { throw new UnsupportedOperationException(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org