Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9BAB95E7 for ; Fri, 1 Jun 2012 22:01:38 +0000 (UTC) Received: (qmail 89285 invoked by uid 500); 1 Jun 2012 22:01:38 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 89234 invoked by uid 500); 1 Jun 2012 22:01:38 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 89226 invoked by uid 99); 1 Jun 2012 22:01:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2012 22:01:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2012 22:01:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 49DC8238897A; Fri, 1 Jun 2012 22:01:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1345366 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ hadoop-yarn/hadoop-yarn-s... Date: Fri, 01 Jun 2012 22:01:09 -0000 To: mapreduce-commits@hadoop.apache.org From: bobby@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120601220110.49DC8238897A@eris.apache.org> Author: bobby Date: Fri Jun 1 22:01:09 2012 New Revision: 1345366 URL: http://svn.apache.org/viewvc?rev=1345366&view=rev Log: svn merge -c 1345362. FIXES: MAPREDUCE-4302. NM goes down if error encountered during log aggregation (Daryn Sharp via bobby) Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java - copied unchanged from r1345362, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jun 1 22:01:09 2012 @@ -217,6 +217,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4297. Usersmap file in gridmix should not fail on empty lines (Ravi Prakash via bobby) + MAPREDUCE-4302. NM goes down if error encountered during log aggregation + (Daryn Sharp via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri Jun 1 22:01:09 2012 @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -524,8 +525,8 @@ public class ContainerManagerImpl extend (CMgrCompletedAppsEvent) event; for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) { this.dispatcher.getEventHandler().handle( - new ApplicationEvent(appID, - ApplicationEventType.FINISH_APPLICATION)); + new ApplicationFinishEvent(appID, + "Application Killed by ResourceManager")); } break; case FINISH_CONTAINERS: Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java Fri Jun 1 22:01:09 2012 @@ -23,7 +23,7 @@ public enum ApplicationEventType { // Source: ContainerManager INIT_APPLICATION, INIT_CONTAINER, - FINISH_APPLICATION, + FINISH_APPLICATION, // Source: LogAggregationService if init fails // Source: ResourceLocalizationService APPLICATION_INITED, @@ -33,5 +33,6 @@ public enum ApplicationEventType { APPLICATION_CONTAINER_FINISHED, // Source: Log Handler + APPLICATION_LOG_HANDLING_INITED, APPLICATION_LOG_HANDLING_FINISHED } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Jun 1 22:01:09 2012 @@ -141,6 +141,9 @@ public class ApplicationImpl implements ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationEventType.FINISH_APPLICATION, new AppFinishTriggeredTransition()) + .addTransition(ApplicationState.INITING, ApplicationState.INITING, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, + new AppLogInitDoneTransition()) .addTransition(ApplicationState.INITING, ApplicationState.RUNNING, ApplicationEventType.APPLICATION_INITED, new AppInitDoneTransition()) @@ -192,8 +195,7 @@ public class ApplicationImpl implements /** * Notify services of new application. * - * In particular, this requests that the {@link ResourceLocalizationService} - * localize the application-scoped resources. + * In particular, this initializes the {@link LogAggregationService} */ @SuppressWarnings("unchecked") static class AppInitTransition implements @@ -203,6 +205,27 @@ public class ApplicationImpl implements ApplicationInitEvent initEvent = (ApplicationInitEvent)event; app.applicationACLs = initEvent.getApplicationACLs(); app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); + // Inform the logAggregator + app.dispatcher.getEventHandler().handle( + new LogHandlerAppStartedEvent(app.appId, app.user, + app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, + app.applicationACLs)); + } + } + + /** + * Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after + * {@link LogAggregationService} has created the directories for the app + * and started the aggregation thread for the app. + * + * In particular, this requests that the {@link ResourceLocalizationService} + * localize the application-scoped resources. + */ + @SuppressWarnings("unchecked") + static class AppLogInitDoneTransition implements + SingleArcTransition { + @Override + public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); @@ -248,13 +271,6 @@ public class ApplicationImpl implements SingleArcTransition { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { - - // Inform the logAggregator - app.dispatcher.getEventHandler().handle( - new LogHandlerAppStartedEvent(app.appId, app.user, - app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs)); - // Start all the containers waiting for ApplicationInit for (Container container : app.containers.values()) { app.dispatcher.getEventHandler().handle(new ContainerInitEvent( Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Jun 1 22:01:09 2012 @@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.logaggrega import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LogAggregationService extends AbstractService implements @@ -146,13 +150,13 @@ public class LogAggregationService exten try { remoteFS = FileSystem.get(conf); } catch (IOException e) { - throw new YarnException("Unable to get Remote FileSystem isntance", e); + throw new YarnException("Unable to get Remote FileSystem instance", e); } boolean remoteExists = false; try { remoteExists = remoteFS.exists(this.remoteRootLogDir); } catch (IOException e) { - throw new YarnException("Failed to check for existance of remoteLogDir [" + throw new YarnException("Failed to check for existence of remoteLogDir [" + this.remoteRootLogDir + "]"); } if (remoteExists) { @@ -266,9 +270,26 @@ public class LogAggregationService exten } } + @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Map appAcls) { + ApplicationEvent eventResponse; + try { + initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); + eventResponse = new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); + } catch (YarnException e) { + eventResponse = new ApplicationFinishEvent(appId, + "Application failed to init aggregation: " + e.getMessage()); + } + this.dispatcher.getEventHandler().handle(eventResponse); + } + + @VisibleForTesting + public void initAppAggregator(final ApplicationId appId, String user, + Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, + Map appAcls) { // Get user's FileSystem credentials UserGroupInformation userUgi = Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Fri Jun 1 22:01:09 2012 @@ -93,6 +93,7 @@ public class NonAggregatingLogHandler ex super.stop(); } + @SuppressWarnings("unchecked") @Override public void handle(LogHandlerEvent event) { switch (event.getType()) { @@ -101,6 +102,9 @@ public class NonAggregatingLogHandler ex (LogHandlerAppStartedEvent) event; this.appOwners.put(appStartedEvent.getApplicationId(), appStartedEvent.getUser()); + this.dispatcher.getEventHandler().handle( + new ApplicationEvent(appStartedEvent.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)); break; case CONTAINER_FINISHED: // Ignore Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1345366&r1=1345365&r2=1345366&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Fri Jun 1 22:01:09 2012 @@ -18,10 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -32,6 +29,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -47,6 +45,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.util.Build import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mortbay.util.MultiException; //@Ignore @@ -112,7 +115,7 @@ public class TestLogAggregationService e @Test @SuppressWarnings("unchecked") - public void testLocalFileDeletionAfterUpload() throws IOException { + public void testLocalFileDeletionAfterUpload() throws Exception { this.delSrvc = new DeletionService(createContainerExecutor()); this.delSrvc.init(conf); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); @@ -170,19 +173,23 @@ public class TestLogAggregationService e logFilePath.toUri().getPath()).exists()); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue() - .getApplicationID()); + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + + checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); + dispatcher.stop(); } @Test @SuppressWarnings("unchecked") - public void testNoContainerOnNode() { + public void testNoContainerOnNode() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); @@ -218,19 +225,22 @@ public class TestLogAggregationService e .exists()); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(application1, eventCaptor.getValue() - .getApplicationID()); + + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); + dispatcher.stop(); } @Test @SuppressWarnings("unchecked") - public void testMultipleAppsLogAggregation() throws IOException { + public void testMultipleAppsLogAggregation() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, @@ -299,9 +309,22 @@ public class TestLogAggregationService e app3LogDir.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); - + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application3, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) + }; + checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID"); + reset(appEventHandler); + ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container31); logAggregationService.handle( @@ -339,22 +362,59 @@ public class TestLogAggregationService e new ContainerId[] { container31, container32 }); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - - verify(appEventHandler, times(3)).handle(eventCaptor.capture()); - List capturedEvents = eventCaptor.getAllValues(); - Set appIds = new HashSet(); - for (ApplicationEvent cap : capturedEvents) { - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - appIds.add(cap.getApplicationID()); - } - assertTrue(appIds.contains(application1)); - assertTrue(appIds.contains(application2)); - assertTrue(appIds.contains(application3)); + + ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent( + application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent( + application3, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID"); + dispatcher.stop(); } - + + @Test + @SuppressWarnings("unchecked") + public void testLogAggregationInitFailsWithoutKillingNM() throws Exception { + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = spy( + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler)); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId appId = BuilderUtils.newApplicationId( + System.currentTimeMillis(), (int)Math.random()); + doThrow(new YarnException("KABOOM!")) + .when(logAggregationService).initAppAggregator( + eq(appId), eq(user), any(Credentials.class), + any(ContainerLogsRetentionPolicy.class), anyMap()); + + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + + dispatcher.await(); + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!") + }; + checkEvents(appEventHandler, expectedEvents, false, + "getType", "getApplicationID", "getDiagnostic"); + } + private void writeContainerLogs(File appLogDir, ContainerId containerId) throws IOException { // ContainerLogDir should be created @@ -599,4 +659,77 @@ public class TestLogAggregationService e Assert.assertEquals("Log aggregator failed to cleanup!", 0, logAggregationService.getNumAggregators()); } + + @SuppressWarnings("unchecked") + private static > + void checkEvents(EventHandler eventHandler, + T expectedEvents[], boolean inOrder, + String... methods) throws Exception { + Class genericClass = (Class)expectedEvents.getClass().getComponentType(); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(genericClass); + // captor work work unless used via a verify + verify(eventHandler, atLeast(0)).handle(eventCaptor.capture()); + List actualEvents = eventCaptor.getAllValues(); + + // batch up exceptions so junit presents them as one + MultiException failures = new MultiException(); + try { + assertEquals("expected events", expectedEvents.length, actualEvents.size()); + } catch (Throwable e) { + failures.add(e); + } + if (inOrder) { + // sequentially verify the events + int len = Math.max(expectedEvents.length, actualEvents.size()); + for (int n=0; n < len; n++) { + try { + String expect = (n < expectedEvents.length) + ? eventToString(expectedEvents[n], methods) : null; + String actual = (n < actualEvents.size()) + ? eventToString(actualEvents.get(n), methods) : null; + assertEquals("event#"+n, expect, actual); + } catch (Throwable e) { + failures.add(e); + } + } + } else { + // verify the actual events were expected + // verify no expected events were not seen + Set expectedSet = new HashSet(); + for (T expectedEvent : expectedEvents) { + expectedSet.add(eventToString(expectedEvent, methods)); + } + for (T actualEvent : actualEvents) { + try { + String actual = eventToString(actualEvent, methods); + assertTrue("unexpected event: "+actual, expectedSet.remove(actual)); + } catch (Throwable e) { + failures.add(e); + } + } + for (String expected : expectedSet) { + try { + Assert.fail("missing event: "+expected); + } catch (Throwable e) { + failures.add(e); + } + } + } + failures.ifExceptionThrow(); + } + + private static String eventToString(Event event, String[] methods) throws Exception { + StringBuilder sb = new StringBuilder("[ "); + for (String m : methods) { + try { + Method method = event.getClass().getMethod(m); + String value = method.invoke(event).toString(); + sb.append(method.getName()).append("=").append(value).append(" "); + } catch (Exception e) { + // ignore, actual event may not implement the method... + } + } + sb.append("]"); + return sb.toString(); + } }