Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7B069200C5B for ; Wed, 5 Apr 2017 02:56:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 79831160BA1; Wed, 5 Apr 2017 00:56:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4D215160B90 for ; Wed, 5 Apr 2017 02:56:37 +0200 (CEST) Received: (qmail 63540 invoked by uid 500); 5 Apr 2017 00:56:36 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 63531 invoked by uid 99); 5 Apr 2017 00:56:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 00:56:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5453CDFC8E; Wed, 5 Apr 2017 00:56:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mingma@apache.org To: common-commits@hadoop.apache.org Message-Id: <106145cb98f2466abd47963889c67ef4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-6004. Refactor TestResourceLocalizationService#testDownloadingResourcesOnContainer so that it is less than 150 lines. (Chris Trezzo via mingma) Date: Wed, 5 Apr 2017 00:56:36 +0000 (UTC) archived-at: Wed, 05 Apr 2017 00:56:38 -0000 Repository: hadoop Updated Branches: refs/heads/trunk 9cc04b470 -> 2d5c09b84 YARN-6004. Refactor TestResourceLocalizationService#testDownloadingResourcesOnContainer so that it is less than 150 lines. (Chris Trezzo via mingma) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d5c09b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d5c09b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d5c09b8 Branch: refs/heads/trunk Commit: 2d5c09b8481d8cb4c2c517df5a9838aa8a875222 Parents: 9cc04b4 Author: Ming Ma Authored: Tue Apr 4 17:56:21 2017 -0700 Committer: Ming Ma Committed: Tue Apr 4 17:56:21 2017 -0700 ---------------------------------------------------------------------- .../TestResourceLocalizationService.java | 376 +++++++++++-------- 1 file changed, 212 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d5c09b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 41cccc1..932e94f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1124,7 +1124,6 @@ public class TestResourceLocalizationService { } @Test(timeout = 20000) - @SuppressWarnings("unchecked") public void testDownloadingResourcesOnContainerKill() throws Exception { List localDirs = new ArrayList(); String[] sDirs = new String[1]; @@ -1132,13 +1131,6 @@ public class TestResourceLocalizationService { sDirs[0] = localDirs.get(0).toString(); conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(conf); - dispatcher.start(); - EventHandler applicationBus = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, applicationBus); - EventHandler containerBus = mock(EventHandler.class); - dispatcher.register(ContainerEventType.class, containerBus); DummyExecutor exec = new DummyExecutor(); LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); @@ -1149,6 +1141,7 @@ public class TestResourceLocalizationService { delService.init(new Configuration()); delService.start(); + DrainDispatcher dispatcher = getDispatcher(conf); ResourceLocalizationService rawService = new ResourceLocalizationService( dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); @@ -1191,180 +1184,235 @@ public class TestResourceLocalizationService { spyService.init(conf); spyService.start(); - final Application app = mock(Application.class); - final ApplicationId appId = - BuilderUtils.newApplicationId(314159265358979L, 3); - String user = "user0"; - when(app.getUser()).thenReturn(user); - when(app.getAppId()).thenReturn(appId); - spyService.handle(new ApplicationLocalizationEvent( - LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - ArgumentMatcher matchesAppInit = + doLocalization(spyService, dispatcher, exec, delService); + + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } + + private DrainDispatcher getDispatcher(Configuration config) { + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(config); + dispatcher.start(); + return dispatcher; + } + + @SuppressWarnings("unchecked") + private EventHandler getApplicationBus( + DrainDispatcher dispatcher) { + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + return applicationBus; + } + + @SuppressWarnings("unchecked") + private EventHandler getContainerBus( + DrainDispatcher dispatcher) { + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + return containerBus; + } + + private void initApp(ResourceLocalizationService spyService, + EventHandler applicationBus, Application app, + ApplicationId appId, DrainDispatcher dispatcher) { + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + ArgumentMatcher matchesAppInit = new ArgumentMatcher() { @Override public boolean matches(Object o) { ApplicationEvent evt = (ApplicationEvent) o; return evt.getType() == ApplicationEventType.APPLICATION_INITED - && appId == evt.getApplicationID(); + && appId == evt.getApplicationID(); } }; - dispatcher.await(); - verify(applicationBus).handle(argThat(matchesAppInit)); - - // Initialize localizer. - Random r = new Random(); - long seed = r.nextLong(); - System.out.println("SEED: " + seed); - r.setSeed(seed); - final Container c1 = getMockContainer(appId, 42, "user0"); - final Container c2 = getMockContainer(appId, 43, "user0"); - FSDataOutputStream out = - new FSDataOutputStream(new DataOutputBuffer(), null); - doReturn(out).when(spylfs).createInternal(isA(Path.class), - isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), - anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), - anyBoolean()); - final LocalResource resource1 = getPrivateMockedResource(r); - LocalResource resource2 = null; - do { - resource2 = getPrivateMockedResource(r); - } while (resource2 == null || resource2.equals(resource1)); - LocalResource resource3 = null; - do { - resource3 = getPrivateMockedResource(r); - } while (resource3 == null || resource3.equals(resource1) - || resource3.equals(resource2)); + dispatcher.await(); + verify(applicationBus).handle(argThat(matchesAppInit)); + } - // Send localization requests for container c1 and c2. - final LocalResourceRequest req1 = new LocalResourceRequest(resource1); - final LocalResourceRequest req2 = new LocalResourceRequest(resource2); - final LocalResourceRequest req3 = new LocalResourceRequest(resource3); - Map> rsrcs = + private void doLocalization(ResourceLocalizationService spyService, + DrainDispatcher dispatcher, DummyExecutor exec, + DeletionService delService) + throws IOException, URISyntaxException, InterruptedException { + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + String user = "user0"; + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + List resources = initializeLocalizer(appId); + LocalResource resource1 = resources.get(0); + LocalResource resource2 = resources.get(1); + LocalResource resource3 = resources.get(2); + final Container c1 = getMockContainer(appId, 42, "user0"); + final Container c2 = getMockContainer(appId, 43, "user0"); + + EventHandler applicationBus = + getApplicationBus(dispatcher); + EventHandler containerBus = getContainerBus(dispatcher); + initApp(spyService, applicationBus, app, appId, dispatcher); + + // Send localization requests for container c1 and c2. + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + final LocalResourceRequest req2 = new LocalResourceRequest(resource2); + final LocalResourceRequest req3 = new LocalResourceRequest(resource3); + Map> rsrcs = new HashMap>(); - List privateResourceList = - new ArrayList(); - privateResourceList.add(req1); - privateResourceList.add(req2); - privateResourceList.add(req3); - rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); - spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); - - final LocalResourceRequest req1_1 = new LocalResourceRequest(resource2); - Map> rsrcs1 = + Collection>(); + List privateResourceList = + new ArrayList(); + privateResourceList.add(req1); + privateResourceList.add(req2); + privateResourceList.add(req3); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs)); + + final LocalResourceRequest req11 = new LocalResourceRequest(resource2); + Map> rsrcs1 = new HashMap>(); - List privateResourceList1 = - new ArrayList(); - privateResourceList1.add(req1_1); - rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1); - spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1)); - - dispatcher.await(); - // Wait for localizers of both container c1 and c2 to begin. - exec.waitForLocalizers(2); - LocalizerRunner locC1 = - spyService.getLocalizerRunner(c1.getContainerId().toString()); - final String containerIdStr = c1.getContainerId().toString(); - // Heartbeats from container localizer - LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); - LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class); - LocalizerStatus stat = mock(LocalizerStatus.class); - when(stat.getLocalizerId()).thenReturn(containerIdStr); - when(rsrc1success.getResource()).thenReturn(resource1); - when(rsrc2pending.getResource()).thenReturn(resource2); - when(rsrc1success.getLocalSize()).thenReturn(4344L); - URL locPath = getPath("/some/path"); - when(rsrc1success.getLocalPath()).thenReturn(locPath); - when(rsrc1success.getStatus()). - thenReturn(ResourceStatusType.FETCH_SUCCESS); - when(rsrc2pending.getStatus()). - thenReturn(ResourceStatusType.FETCH_PENDING); - - when(stat.getResources()) - .thenReturn(Collections.emptyList()) - .thenReturn(Collections.singletonList(rsrc1success)) - .thenReturn(Collections.singletonList(rsrc2pending)) - .thenReturn(Collections.singletonList(rsrc2pending)) - .thenReturn(Collections.emptyList()); - - // First heartbeat which schedules first resource. - LocalizerHeartbeatResponse response = spyService.heartbeat(stat); - assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - - // Second heartbeat which reports first resource as success. - // Second resource is scheduled. - response = spyService.heartbeat(stat); - assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - final String locPath1 = response.getResourceSpecs().get(0). - getDestinationDirectory().getFile(); - - // Third heartbeat which reports second resource as pending. - // Third resource is scheduled. - response = spyService.heartbeat(stat); - assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - final String locPath2 = response.getResourceSpecs().get(0). - getDestinationDirectory().getFile(); - - // Container c1 is killed which leads to cleanup - spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); - - // This heartbeat will indicate to container localizer to die as localizer - // runner has stopped. - response = spyService.heartbeat(stat); - assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); - - exec.setStopLocalization(); - dispatcher.await(); - // verify container notification - ArgumentMatcher successContainerLoc = + Collection>(); + List privateResourceList1 = + new ArrayList(); + privateResourceList1.add(req11); + rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1); + spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1)); + + dispatcher.await(); + // Wait for localizers of both container c1 and c2 to begin. + exec.waitForLocalizers(2); + LocalizerRunner locC1 = + spyService.getLocalizerRunner(c1.getContainerId().toString()); + + LocalizerStatus stat = mockLocalizerStatus(c1, resource1, resource2); + + // First heartbeat which schedules first resource. + LocalizerHeartbeatResponse response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + // Second heartbeat which reports first resource as success. + // Second resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath1 = + response.getResourceSpecs().get(0).getDestinationDirectory().getFile(); + + // Third heartbeat which reports second resource as pending. + // Third resource is scheduled. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + final String locPath2 = + response.getResourceSpecs().get(0).getDestinationDirectory().getFile(); + + // Container c1 is killed which leads to cleanup + spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs)); + + // This heartbeat will indicate to container localizer to die as localizer + // runner has stopped. + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); + + exec.setStopLocalization(); + dispatcher.await(); + // verify container notification + ArgumentMatcher successContainerLoc = new ArgumentMatcher() { @Override public boolean matches(Object o) { ContainerEvent evt = (ContainerEvent) o; return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED - && c1.getContainerId() == evt.getContainerID(); + && c1.getContainerId() == evt.getContainerID(); } }; - // Only one resource gets localized for container c1. - verify(containerBus).handle(argThat(successContainerLoc)); - - Set paths = - Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"), - new Path(locPath2), new Path(locPath2 + "_tmp")); - // Wait for localizer runner thread for container c1 to finish. - while (locC1.getState() != Thread.State.TERMINATED) { - Thread.sleep(50); - } - // Verify if downloading resources were submitted for deletion. - verify(delService).delete(eq(user), - (Path) eq(null), argThat(new DownloadingPathsMatcher(paths))); - - LocalResourcesTracker tracker = spyService.getLocalResourcesTracker( - LocalResourceVisibility.PRIVATE, "user0", appId); - // Container c1 was killed but this resource was localized before kill - // hence its not removed despite ref cnt being 0. - LocalizedResource rsrc1 = tracker.getLocalizedResource(req1); - assertNotNull(rsrc1); - assertEquals(rsrc1.getState(), ResourceState.LOCALIZED); - assertEquals(rsrc1.getRefCount(), 0); - - // Container c1 was killed but this resource is referenced by container c2 - // as well hence its ref cnt is 1. - LocalizedResource rsrc2 = tracker.getLocalizedResource(req2); - assertNotNull(rsrc2); - assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING); - assertEquals(rsrc2.getRefCount(), 1); - - // As container c1 was killed and this resource was not referenced by any - // other container, hence its removed. - LocalizedResource rsrc3 = tracker.getLocalizedResource(req3); - assertNull(rsrc3); - } finally { - spyService.stop(); - dispatcher.stop(); - delService.stop(); + // Only one resource gets localized for container c1. + verify(containerBus).handle(argThat(successContainerLoc)); + + Set paths = + Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"), + new Path(locPath2), new Path(locPath2 + "_tmp")); + // Wait for localizer runner thread for container c1 to finish. + while (locC1.getState() != Thread.State.TERMINATED) { + Thread.sleep(50); } + // Verify if downloading resources were submitted for deletion. + verify(delService).delete(eq(user), (Path) eq(null), + argThat(new DownloadingPathsMatcher(paths))); + + LocalResourcesTracker tracker = spyService.getLocalResourcesTracker( + LocalResourceVisibility.PRIVATE, "user0", appId); + // Container c1 was killed but this resource was localized before kill + // hence its not removed despite ref cnt being 0. + LocalizedResource rsrc1 = tracker.getLocalizedResource(req1); + assertNotNull(rsrc1); + assertEquals(rsrc1.getState(), ResourceState.LOCALIZED); + assertEquals(rsrc1.getRefCount(), 0); + + // Container c1 was killed but this resource is referenced by container c2 + // as well hence its ref cnt is 1. + LocalizedResource rsrc2 = tracker.getLocalizedResource(req2); + assertNotNull(rsrc2); + assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING); + assertEquals(rsrc2.getRefCount(), 1); + + // As container c1 was killed and this resource was not referenced by any + // other container, hence its removed. + LocalizedResource rsrc3 = tracker.getLocalizedResource(req3); + assertNull(rsrc3); + } + + private LocalizerStatus mockLocalizerStatus(Container c1, + LocalResource resource1, LocalResource resource2) { + final String containerIdStr = c1.getContainerId().toString(); + // Heartbeats from container localizer + LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); + LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class); + LocalizerStatus stat = mock(LocalizerStatus.class); + when(stat.getLocalizerId()).thenReturn(containerIdStr); + when(rsrc1success.getResource()).thenReturn(resource1); + when(rsrc2pending.getResource()).thenReturn(resource2); + when(rsrc1success.getLocalSize()).thenReturn(4344L); + URL locPath = getPath("/some/path"); + when(rsrc1success.getLocalPath()).thenReturn(locPath); + when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING); + + when(stat.getResources()) + .thenReturn(Collections. emptyList()) + .thenReturn(Collections.singletonList(rsrc1success)) + .thenReturn(Collections.singletonList(rsrc2pending)) + .thenReturn(Collections.singletonList(rsrc2pending)) + .thenReturn(Collections. emptyList()); + return stat; + } + + @SuppressWarnings("unchecked") + private List initializeLocalizer(ApplicationId appId) + throws IOException { + // Initialize localizer. + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + FSDataOutputStream out = + new FSDataOutputStream(new DataOutputBuffer(), null); + doReturn(out).when(spylfs).createInternal(isA(Path.class), + isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), + anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), + anyBoolean()); + final LocalResource resource1 = getPrivateMockedResource(r); + LocalResource resource2 = null; + do { + resource2 = getPrivateMockedResource(r); + } while (resource2 == null || resource2.equals(resource1)); + LocalResource resource3 = null; + do { + resource3 = getPrivateMockedResource(r); + } while (resource3 == null || resource3.equals(resource1) + || resource3.equals(resource2)); + return Arrays.asList(resource1, resource2, resource3); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org