Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B0D417AF0 for ; Wed, 23 Sep 2015 20:33:01 +0000 (UTC) Received: (qmail 71720 invoked by uid 500); 23 Sep 2015 20:32:55 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 71406 invoked by uid 500); 23 Sep 2015 20:32:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 70489 invoked by uid 99); 23 Sep 2015 20:32:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Sep 2015 20:32:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C4DBE0244; Wed, 23 Sep 2015 20:32:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Wed, 23 Sep 2015 20:33:08 -0000 Message-Id: <14b5b83d66ee46c7bbd11f1450e87137@git.apache.org> In-Reply-To: <0566134e15834fbf968df1619e8ad8cc@git.apache.org> References: <0566134e15834fbf968df1619e8ad8cc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/21] hadoop git commit: YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 85d2515..8fa1ad2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,44 +18,51 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import static java.lang.Thread.sleep; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; - +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; - +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.Assert; - -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.lang.Thread.sleep; public class TestApplicationMasterService { private static final Log LOG = LogFactory @@ -343,6 +350,92 @@ public class TestApplicationMasterService { alloc1Response = am1.schedule(); Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size()); } + + @Test(timeout=60000) + public void testInvalidIncreaseDecreaseRequest() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(1024); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + RegisterApplicationMasterResponse registerResponse = + am1.registerAppAttempt(); + + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1)); + + // Ask for a normal increase should be successfull + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest.newInstance( + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.createResource(2048))), null); + + // Target resource is negative, should fail + boolean exceptionCaught = false; + try { + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest.newInstance( + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources.createResource(-1))), null); + } catch (InvalidResourceRequestException e) { + // This is expected + exceptionCaught = true; + } + Assert.assertTrue(exceptionCaught); + + // Target resource is more than maxAllocation, should fail + try { + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest.newInstance( + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources + .add(registerResponse.getMaximumResourceCapability(), + Resources.createResource(1)))), null); + } catch (InvalidResourceRequestException e) { + // This is expected + exceptionCaught = true; + } + + Assert.assertTrue(exceptionCaught); + + // Contains multiple increase/decrease requests for same contaienrId + try { + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest.newInstance( + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources + .add(registerResponse.getMaximumResourceCapability(), + Resources.createResource(1)))), Arrays.asList( + ContainerResourceChangeRequest.newInstance( + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + Resources + .add(registerResponse.getMaximumResourceCapability(), + Resources.createResource(1))))); + } catch (InvalidResourceRequestException e) { + // This is expected + exceptionCaught = true; + } + + Assert.assertTrue(exceptionCaught); + } finally { + if (rm != null) { + rm.close(); + } + } + } private static class MyResourceManager extends MockRM { @@ -354,4 +447,15 @@ public class TestApplicationMasterService { return new DrainDispatcher(); } } + + private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + Assert.fail("Cannot find RMContainer"); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index dc843b9..168280a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -331,11 +332,15 @@ public class TestAMRestart { MockAM am2 = MockRM.launchAM(app1, rm1, nm1); RegisterApplicationMasterResponse registerResponse = am2.registerAppAttempt(); - rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); // check am2 get the nm token from am1. - Assert.assertEquals(expectedNMTokens, - registerResponse.getNMTokensFromPreviousAttempts()); + Assert.assertEquals(expectedNMTokens.size(), + registerResponse.getNMTokensFromPreviousAttempts().size()); + for (int i = 0; i < expectedNMTokens.size(); i++) { + Assert.assertTrue(expectedNMTokens.get(i) + .equals(registerResponse.getNMTokensFromPreviousAttempts().get(i))); + } // am2 allocate 1 container on nm2 containers = new ArrayList(); @@ -365,7 +370,7 @@ public class TestAMRestart { // restart am MockAM am3 = MockRM.launchAM(app1, rm1, nm1); registerResponse = am3.registerAppAttempt(); - rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING); // check am3 get the NM token from both am1 and am2; List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts(); @@ -430,7 +435,7 @@ public class TestAMRestart { ContainerStatus containerStatus = BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE, - "", ContainerExitStatus.DISKS_FAILED); + "", ContainerExitStatus.DISKS_FAILED, Resources.createResource(200)); currentNode.containerStatus(containerStatus); am1.waitForState(RMAppAttemptState.FAILED); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 484a1b6..1f307aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -165,7 +165,7 @@ public class TestRMAppLogAggregationStatus { node1ReportForApp.add(report1); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, - null, node1ReportForApp)); + null, node1ReportForApp, null)); List node2ReportForApp = new ArrayList(); @@ -177,7 +177,7 @@ public class TestRMAppLogAggregationStatus { node2ReportForApp.add(report2); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, - null, node2ReportForApp)); + null, node2ReportForApp, null)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 // has been changed @@ -215,7 +215,7 @@ public class TestRMAppLogAggregationStatus { node1ReportForApp2.add(report1_2); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, - null, node1ReportForApp2)); + null, node1ReportForApp2, null)); // verify that the log aggregation status for node1 // has been changed @@ -284,7 +284,7 @@ public class TestRMAppLogAggregationStatus { // 10 diagnostic messages/failure messages node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, - null, node1ReportForApp3)); + null, node1ReportForApp3, null)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); Assert.assertEquals(2, logAggregationStatus.size()); @@ -329,7 +329,7 @@ public class TestRMAppLogAggregationStatus { node2ReportForApp2.add(report2_3); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, - null, node2ReportForApp2)); + null, node2ReportForApp2, null)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport()); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 10ec453..828e149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -465,10 +465,9 @@ public class TestRMAppAttemptTransitions { expectedAllocateCount = 1; } - assertEquals(expectedState, - applicationAttempt.getAppAttemptState()); - verify(scheduler, times(expectedAllocateCount)). - allocate(any(ApplicationAttemptId.class), + assertEquals(expectedState, applicationAttempt.getAppAttemptState()); + verify(scheduler, times(expectedAllocateCount)).allocate( + any(ApplicationAttemptId.class), any(List.class), any(List.class), any(List.class), any(List.class), any(List.class), any(List.class)); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -488,11 +487,9 @@ public class TestRMAppAttemptTransitions { assertEquals(amContainer, applicationAttempt.getMasterContainer()); // Check events verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); - verify(scheduler, times(2)). - allocate( - any( - ApplicationAttemptId.class), any(List.class), any(List.class), - any(List.class), any(List.class)); + verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class)); verify(nmTokenManager).clearNodeSetForAttempt( applicationAttempt.getAppAttemptId()); } @@ -641,13 +638,9 @@ public class TestRMAppAttemptTransitions { Allocation allocation = mock(Allocation.class); when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); - when( - scheduler.allocate( - any(ApplicationAttemptId.class), - any(List.class), - any(List.class), - any(List.class), - any(List.class))). + when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class))). thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(container.getId())). @@ -1511,10 +1504,9 @@ public class TestRMAppAttemptTransitions { @Test public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() { YarnScheduler mockScheduler = mock(YarnScheduler.class); - when( - mockScheduler.allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class))) - .thenAnswer(new Answer() { + when(mockScheduler.allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class))).thenAnswer(new Answer() { @SuppressWarnings("rawtypes") @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index e4e2049..415e891 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -191,6 +190,10 @@ public class TestRMContainerImpl { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + + ConcurrentMap appMap = new ConcurrentHashMap<>(); + RMApp rmApp = mock(RMApp.class); + appMap.putIfAbsent(appId, rmApp); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); @@ -200,6 +203,7 @@ public class TestRMContainerImpl { when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); + when(rmContext.getRMApps()).thenReturn(appMap); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, "user", rmContext); @@ -235,11 +239,118 @@ public class TestRMContainerImpl { rmContainer.handle(new RMContainerFinishedEvent(containerId, containerStatus, RMContainerEventType.EXPIRE)); drainDispatcher.await(); + assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); + verify(writer, times(1)).containerFinished(any(RMContainer.class)); + verify(publisher, times(1)).containerFinished(any(RMContainer.class), + anyLong()); + } + + private void testExpireAfterIncreased(boolean acquired) { + /* + * Similar to previous test, a container is increased but not acquired by + * AM. In this case, if a container is expired, the container should be + * finished. + */ + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler appAttemptEventHandler = + mock(EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + + Resource resource = BuilderUtils.newResource(512, 1); + Priority priority = BuilderUtils.newPriority(5); + + Container container = BuilderUtils.newContainer(containerId, nodeId, + "host:3465", resource, priority, null); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); + ConcurrentMap apps = + new ConcurrentHashMap(); + apps.put(appId, mock(RMApp.class)); + when(rmContext.getRMApps()).thenReturn(apps); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + nodeId, "user", rmContext); + + assertEquals(RMContainerState.NEW, rmContainer.getState()); + assertEquals(resource, rmContainer.getAllocatedResource()); + assertEquals(nodeId, rmContainer.getAllocatedNode()); + assertEquals(priority, rmContainer.getAllocatedPriority()); + verify(writer).containerStarted(any(RMContainer.class)); + verify(publisher).containerCreated(any(RMContainer.class), anyLong()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.ACQUIRED)); + drainDispatcher.await(); + assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - verify(writer, never()).containerFinished(any(RMContainer.class)); - verify(publisher, never()).containerFinished(any(RMContainer.class), + assertEquals( + "http://host:3465/node/containerlogs/container_1_0001_01_000001/user", + rmContainer.getLogURL()); + + // newResource is more than the old resource + Resource newResource = BuilderUtils.newResource(1024, 2); + rmContainer.handle(new RMContainerChangeResourceEvent(containerId, + newResource, true)); + + if (acquired) { + rmContainer + .handle(new RMContainerUpdatesAcquiredEvent(containerId, true)); + drainDispatcher.await(); + // status is still RUNNING since this is a increased container acquired by + // AM + assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + } + + // In RUNNING state. Verify EXPIRE and associated actions. + reset(appAttemptEventHandler); + ContainerStatus containerStatus = SchedulerUtils + .createAbnormalContainerStatus(containerId, + SchedulerUtils.EXPIRED_CONTAINER); + rmContainer.handle(new RMContainerFinishedEvent(containerId, + containerStatus, RMContainerEventType.EXPIRE)); + drainDispatcher.await(); + assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); + + // Container will be finished only when it is acquired by AM after increase, + // we will only notify expirer when it is acquired by AM. + verify(writer, times(1)).containerFinished(any(RMContainer.class)); + verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong()); } + + @Test + public void testExpireAfterContainerResourceIncreased() throws Exception { + // expire after increased and acquired by AM + testExpireAfterIncreased(true); + // expire after increased but not acquired by AM + testExpireAfterIncreased(false); + } @Test public void testExistenceOfResourceRequestInRMContainer() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 88c1444..7f6a749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -31,7 +31,6 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -103,6 +103,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -139,7 +141,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -678,11 +679,11 @@ public class TestCapacityScheduler { // Verify the blacklist can be updated independent of requesting containers cs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), - Collections.singletonList(host), null); + Collections.singletonList(host), null, null, null); Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); cs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), null, - Collections.singletonList(host)); + Collections.singletonList(host), null, null); Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); rm.stop(); } @@ -777,7 +778,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId1, Collections.singletonList(r1), Collections.emptyList(), - null, null); + null, null, null, null); //And this will result in container assignment for app1 CapacityScheduler.schedule(cs); @@ -794,7 +795,7 @@ public class TestCapacityScheduler { cs.allocate(appAttemptId2, Collections.singletonList(r2), Collections.emptyList(), - null, null); + null, null, null, null); //In this case we do not perform container assignment because we want to //verify re-ordering based on the allocation alone @@ -2907,7 +2908,7 @@ public class TestCapacityScheduler { Allocation allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null); + Collections. emptyList(), null, null, null, null); Assert.assertNotNull(attempt); @@ -2923,7 +2924,7 @@ public class TestCapacityScheduler { allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null); + Collections. emptyList(), null, null, null, null); // All resources should be sent as headroom Assert.assertEquals(newResource, allocate.getResourceLimit()); @@ -3084,7 +3085,107 @@ public class TestCapacityScheduler { config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, DominantResourceCalculator.class.getName()); verifyAMLimitForLeafQueue(config); + } + + private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, + ApplicationId appId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); + } + @Test + public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges() + throws Exception { + Configuration conf = + TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm = new MockRM(conf, memStore) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + + MockNM nm1 = // label = "" + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(2 * GB), 2)), + null); + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + Assert.assertTrue(rm.waitForState(nm1, containerId3, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them + am1.allocate(null, null); + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1L)); + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L)); + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3L)); + + // am1 asks to change its AM container from 1GB to 3GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(3 * GB))), + null); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); + + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + checkPendingResource(rm, "a1", 2 * GB, null); + checkPendingResource(rm, "a", 2 * GB, null); + checkPendingResource(rm, "root", 2 * GB, null); + + // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G) + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId2, Resources.createResource(3 * GB)), + ContainerResourceChangeRequest + .newInstance(containerId3, Resources.createResource(5 * GB))), + null); + + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + checkPendingResource(rm, "a1", 6 * GB, null); + checkPendingResource(rm, "a", 6 * GB, null); + checkPendingResource(rm, "root", 6 * GB, null); + + // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and + // containerId3 (2G -> 2G) + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(3 * GB)), + ContainerResourceChangeRequest + .newInstance(containerId2, Resources.createResource(4 * GB)), + ContainerResourceChangeRequest + .newInstance(containerId3, Resources.createResource(2 * GB))), + null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "root", 4 * GB, null); } private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config) @@ -3146,4 +3247,15 @@ public class TestCapacityScheduler { + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; conf.setInt(propName, maxAllocVcores); } + + private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + Assert.fail("Cannot find RMContainer"); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 9dcab2e..88c7c13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -132,11 +132,11 @@ public class TestChildQueueOrder { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource, RMNodeLabelsManager.NO_LABEL); + allocatedResource, RMNodeLabelsManager.NO_LABEL, false); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource, null, null); + allocatedResource, null, null, false); } // Next call - nothing http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 769041b..b5b2222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -60,9 +59,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - public class TestContainerAllocation { @@ -199,13 +195,16 @@ public class TestContainerAllocation { // acquire the container. SecurityUtilTestHelper.setTokenServiceUseIp(true); - List containers = - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers(); - // not able to fetch the container; - Assert.assertEquals(0, containers.size()); - - SecurityUtilTestHelper.setTokenServiceUseIp(false); + List containers; + try { + containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // not able to fetch the container; + Assert.assertEquals(0, containers.size()); + } finally { + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } containers = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); @@ -315,21 +314,24 @@ public class TestContainerAllocation { rm1.start(); MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); - SecurityUtilTestHelper.setTokenServiceUseIp(true); - RMApp app1 = rm1.submitApp(200); - RMAppAttempt attempt = app1.getCurrentAppAttempt(); - nm1.nodeHeartbeat(true); - - // fetching am container will fail, keep retrying 5 times. - while (numRetries <= 5) { + RMApp app1; + try { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + app1 = rm1.submitApp(200); + RMAppAttempt attempt = app1.getCurrentAppAttempt(); nm1.nodeHeartbeat(true); - Thread.sleep(1000); - Assert.assertEquals(RMAppAttemptState.SCHEDULED, - attempt.getAppAttemptState()); - System.out.println("Waiting for am container to be allocated."); - } - SecurityUtilTestHelper.setTokenServiceUseIp(false); + // fetching am container will fail, keep retrying 5 times. + while (numRetries <= 5) { + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + System.out.println("Waiting for am container to be allocated."); + } + } finally { + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } MockRM.launchAndRegisterAM(app1, rm1, nm1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java new file mode 100644 index 0000000..23283f6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -0,0 +1,963 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestContainerResizing { + private final int GB = 1024; + + private YarnConfiguration conf; + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test + public void testSimpleIncreaseContainer() throws Exception { + /** + * Application has a container running, and the node has enough available + * resource. Add a increase request to see if container will be increased + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm1, containerId1); + // am1 asks to change its AM container from 1GB to 3GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(3 * GB))), + null); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + checkPendingResource(rm1, "default", 2 * GB, null); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Pending resource should be deducted + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + verifyContainerIncreased(am1.allocate(null, null), containerId1, 3 * GB); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 17 * GB); + + rm1.close(); + } + + @Test + public void testSimpleDecreaseContainer() throws Exception { + /** + * Application has a container running, try to decrease the container and + * check queue's usage and container resource will be updated. + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + checkUsedResource(rm1, "default", 3 * GB, null); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm1, containerId1); + + // am1 asks to change its AM container from 1GB to 3GB + AllocateResponse response = am1.sendContainerResizingRequest(null, Arrays + .asList(ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(1 * GB)))); + + verifyContainerDecreased(response, containerId1, 1 * GB); + checkUsedResource(rm1, "default", 1 * GB, null); + Assert.assertEquals(1 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + // Check if decreased containers added to RMNode + RMNodeImpl rmNode = + (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + Collection decreasedContainers = + rmNode.getToBeDecreasedContainers(); + boolean rmNodeReceivedDecreaseContainer = false; + for (Container c : decreasedContainers) { + if (c.getId().equals(containerId1) + && c.getResource().equals(Resources.createResource(1 * GB))) { + rmNodeReceivedDecreaseContainer = true; + } + } + Assert.assertTrue(rmNodeReceivedDecreaseContainer); + + rm1.close(); + } + + @Test + public void testSimpleIncreaseRequestReservation() throws Exception { + /** + * Application has two containers running, try to increase one of then, node + * doesn't have enough resource, so the increase request will be reserved. + * Check resource usage after container reserved, finish a container, the + * reserved container should be allocated. + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), + null); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm1, containerId2, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them, and NM report RUNNING + am1.allocate(null, null); + sentRMContainerLaunched(rm1, containerId2); + + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm1, containerId1); + + + // am1 asks to change its AM container from 1GB to 3GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(7 * GB))), + null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); + + /* Check reservation statuses */ + // Increase request should be reserved + Assert.assertTrue(rmContainer1.hasIncreaseReservation()); + Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory()); + Assert.assertFalse(app.getReservedContainers().isEmpty()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 9 * GB, null); + Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + + // Complete one container and do another allocation + am1.allocate(null, Arrays.asList(containerId2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Now container should be increased + verifyContainerIncreased(am1.allocate(null, null), containerId1, 7 * GB); + + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + Assert.assertFalse(rmContainer1.hasIncreaseReservation()); + Assert.assertTrue(app.getReservedContainers().isEmpty()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will be changed since it's satisfied + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 7 * GB, null); + Assert.assertEquals(7 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(7 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 1 * GB); + + rm1.close(); + } + + @Test + public void testExcessiveReservationWhenCancelIncreaseRequest() + throws Exception { + /** + * Application has two containers running, try to increase one of then, node + * doesn't have enough resource, so the increase request will be reserved. + * Check resource usage after container reserved, finish a container & + * cancel the increase request, reservation should be cancelled + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), + null); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm1, containerId2, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them, and NM report RUNNING + am1.allocate(null, null); + sentRMContainerLaunched(rm1, containerId2); + + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm1, containerId1); + + // am1 asks to change its AM container from 1GB to 3GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(7 * GB))), + null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); + + /* Check reservation statuses */ + // Increase request should be reserved + Assert.assertTrue(rmContainer1.hasIncreaseReservation()); + Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory()); + Assert.assertFalse(app.getReservedContainers().isEmpty()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 9 * GB, null); + Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + + // Complete one container and cancel increase request (via send a increase + // request, make target_capacity=existing_capacity) + am1.allocate(null, Arrays.asList(containerId2)); + // am1 asks to change its AM container from 1G to 1G (cancel the increase + // request actually) + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(1 * GB))), + null); + // Trigger a node heartbeat.. + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + Assert.assertTrue(app.getReservedContainers().isEmpty()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertFalse(rmContainer1.hasIncreaseReservation()); + // Pending resource will be changed since it's satisfied + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 1 * GB, null); + Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(1 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + @Test + public void testExcessiveReservationWhenDecreaseSameContainer() + throws Exception { + /** + * Very similar to testExcessiveReservationWhenCancelIncreaseRequest, after + * the increase request reserved, it decreases the reserved container, + * container should be decreased and reservation will be cancelled + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), + null); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm1, containerId2, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them, and NM report RUNNING + am1.allocate(null, null); + sentRMContainerLaunched(rm1, containerId2); + + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm1, containerId1); + + + // am1 asks to change its AM container from 2GB to 8GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(8 * GB))), + null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); + + /* Check reservation statuses */ + // Increase request should be reserved + Assert.assertTrue(rmContainer1.hasIncreaseReservation()); + Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory()); + Assert.assertFalse(app.getReservedContainers().isEmpty()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 10 * GB, null); + Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + + // Complete one container and cancel increase request (via send a increase + // request, make target_capacity=existing_capacity) + am1.allocate(null, Arrays.asList(containerId2)); + // am1 asks to change its AM container from 2G to 1G (decrease) + am1.sendContainerResizingRequest(null, Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(1 * GB)))); + // Trigger a node heartbeat.. + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + Assert.assertTrue(app.getReservedContainers().isEmpty()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertFalse(rmContainer1.hasIncreaseReservation()); + // Pending resource will be changed since it's satisfied + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 1 * GB, null); + Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(1 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + @Test + public void testIncreaseContainerUnreservedWhenContainerCompleted() + throws Exception { + /** + * App has two containers on the same node (node.resource = 8G), container1 + * = 2G, container2 = 2G. App asks to increase container2 to 8G. + * + * So increase container request will be reserved. When app releases + * container2, reserved part should be released as well. + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), + null); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm1, containerId2, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them, and NM report RUNNING + am1.allocate(null, null); + sentRMContainerLaunched(rm1, containerId2); + rm1.waitForContainerState(containerId2, RMContainerState.RUNNING); + + // am1 asks to change its AM container from 2GB to 8GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId2, Resources.createResource(8 * GB))), + null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); + + /* Check reservation statuses */ + // Increase request should be reserved + Assert.assertTrue(rmContainer2.hasIncreaseReservation()); + Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory()); + Assert.assertFalse(app.getReservedContainers().isEmpty()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 9 * GB, null); + Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + + // Complete container2, container will be unreserved and completed + am1.allocate(null, Arrays.asList(containerId2)); + + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + Assert.assertTrue(app.getReservedContainers().isEmpty()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertFalse(rmContainer2.hasIncreaseReservation()); + // Pending resource will be changed since it's satisfied + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 1 * GB, null); + Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(1 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + @Test + public void testIncreaseContainerUnreservedWhenApplicationCompleted() + throws Exception { + /** + * Similar to testIncreaseContainerUnreservedWhenContainerCompleted, when + * application finishes, reserved increase container should be cancelled + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + + // Allocate two more containers + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(2 * GB), 1)), + null); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue( + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED, + 10 * 1000)); + // Acquire them, and NM report RUNNING + am1.allocate(null, null); + sentRMContainerLaunched(rm1, containerId2); + + // am1 asks to change its AM container from 2GB to 8GB + am1.sendContainerResizingRequest(Arrays.asList( + ContainerResourceChangeRequest + .newInstance(containerId2, Resources.createResource(8 * GB))), + null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // NM1 do 1 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); + + /* Check reservation statuses */ + // Increase request should be reserved + Assert.assertTrue(rmContainer2.hasIncreaseReservation()); + Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory()); + Assert.assertFalse(app.getReservedContainers().isEmpty()); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + // Pending resource will not be changed since it's not satisfied + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 9 * GB, null); + Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + + // Kill the application + cs.handle(new AppAttemptRemovedSchedulerEvent(am1.getApplicationAttemptId(), + RMAppAttemptState.KILLED, false)); + + /* Check statuses after reservation satisfied */ + // Increase request should be unreserved + Assert.assertTrue(app.getReservedContainers().isEmpty()); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertFalse(rmContainer2.hasIncreaseReservation()); + // Pending resource will be changed since it's satisfied + checkPendingResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 0 * GB, null); + Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, + int nContainer, int mem, int priority, int startContainerId) + throws Exception { + am.allocate(Arrays + .asList(ResourceRequest.newInstance(Priority.newInstance(priority), "*", + Resources.createResource(mem), nContainer)), + null); + ContainerId lastContainerId = ContainerId.newContainerId( + am.getApplicationAttemptId(), startContainerId + nContainer - 1); + Assert.assertTrue(rm.waitForState(nm, lastContainerId, + RMContainerState.ALLOCATED, 10 * 1000)); + // Acquire them, and NM report RUNNING + am.allocate(null, null); + + for (int cId = startContainerId; cId < startContainerId + + nContainer; cId++) { + sentRMContainerLaunched(rm, + ContainerId.newContainerId(am.getApplicationAttemptId(), cId)); + rm.waitForContainerState( + ContainerId.newContainerId(am.getApplicationAttemptId(), cId), + RMContainerState.RUNNING); + } + } + + @Test + public void testOrderOfIncreaseContainerRequestAllocation() + throws Exception { + /** + * There're multiple containers need to be increased, check container will + * be increase sorted by priority, if priority is same, smaller containerId + * container will get preferred + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); + + // Container 2, 3 (priority=3) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2); + + // Container 4, 5 (priority=2) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4); + + // Container 6, 7 (priority=4) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6); + + // am1 asks to change its container[2-7] from 1G to 2G + List increaseRequests = new ArrayList<>(); + for (int cId = 2; cId <= 7; cId++) { + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), cId); + increaseRequests.add(ContainerResourceChangeRequest + .newInstance(containerId, Resources.createResource(2 * GB))); + } + am1.sendContainerResizingRequest(increaseRequests, null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // Get rmNode1 + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // assignContainer, container-4/5/2 increased (which has highest priority OR + // earlier allocated) + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + AllocateResponse allocateResponse = am1.allocate(null, null); + Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 4), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 5), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 2), 2 * GB); + + /* Check statuses after allocation */ + // There're still 3 pending increase requests + checkPendingResource(rm1, "default", 3 * GB, null); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 10 * GB, null); + Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(10 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + @Test + public void testIncreaseContainerRequestGetPreferrence() + throws Exception { + /** + * There're multiple containers need to be increased, and there're several + * container allocation request, scheduler will try to increase container + * before allocate new containers + */ + MockRM rm1 = new MockRM() { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); + + // Container 2, 3 (priority=3) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2); + + // Container 4, 5 (priority=2) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4); + + // Container 6, 7 (priority=4) + allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6); + + // am1 asks to change its container[2-7] from 1G to 2G + List increaseRequests = new ArrayList<>(); + for (int cId = 2; cId <= 7; cId++) { + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), cId); + increaseRequests.add(ContainerResourceChangeRequest + .newInstance(containerId, Resources.createResource(2 * GB))); + } + am1.sendContainerResizingRequest(increaseRequests, null); + + checkPendingResource(rm1, "default", 6 * GB, null); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + + // Get rmNode1 + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // assignContainer, container-4/5/2 increased (which has highest priority OR + // earlier allocated) + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + AllocateResponse allocateResponse = am1.allocate(null, null); + Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 4), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 5), 2 * GB); + verifyContainerIncreased(allocateResponse, + ContainerId.newContainerId(attemptId, 2), 2 * GB); + + /* Check statuses after allocation */ + // There're still 3 pending increase requests + checkPendingResource(rm1, "default", 3 * GB, null); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getPending().getMemory()); + // Queue/user/application's usage will be updated + checkUsedResource(rm1, "default", 10 * GB, null); + Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default")) + .getUser("user").getUsed().getMemory()); + Assert.assertEquals(0 * GB, + app.getAppAttemptResourceUsage().getReserved().getMemory()); + Assert.assertEquals(10 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + + rm1.close(); + } + + private void checkPendingResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals(memory, + queue.getQueueResourceUsage() + .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory()); + } + + private void checkUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals(memory, + queue.getQueueResourceUsage() + .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory()); + } + + private void verifyContainerIncreased(AllocateResponse response, + ContainerId containerId, int mem) { + List increasedContainers = response.getIncreasedContainers(); + boolean found = false; + for (Container c : increasedContainers) { + if (c.getId().equals(containerId)) { + found = true; + Assert.assertEquals(mem, c.getResource().getMemory()); + } + } + if (!found) { + Assert.fail("Container not increased: containerId=" + containerId); + } + } + + private void verifyContainerDecreased(AllocateResponse response, + ContainerId containerId, int mem) { + List decreasedContainers = response.getDecreasedContainers(); + boolean found = false; + for (Container c : decreasedContainers) { + if (c.getId().equals(containerId)) { + found = true; + Assert.assertEquals(mem, c.getResource().getMemory()); + } + } + if (!found) { + Assert.fail("Container not decreased: containerId=" + containerId); + } + } + + private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMContainer rmContainer = cs.getRMContainer(containerId); + if (rmContainer != null) { + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } else { + Assert.fail("Cannot find RMContainer"); + } + } + + private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, + int expectedMemory) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulerNode node = cs.getNode(nodeId); + Assert + .assertEquals(expectedMemory, node.getAvailableResource().getMemory()); + } + + private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, + ApplicationId appId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/89cab1ba/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index fe8be06..b85c697 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -770,9 +770,9 @@ public class TestLeafQueue { qb.finishApplication(app_0.getApplicationId(), user_0); qb.finishApplication(app_2.getApplicationId(), user_1); qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority), - null, null); + null, null, false); qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority), - null, null); + null, null, false); qb.setUserLimit(50); qb.setUserLimitFactor(1);