Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EBF62D64F for ; Wed, 5 Sep 2012 19:40:38 +0000 (UTC) Received: (qmail 8852 invoked by uid 500); 5 Sep 2012 19:40:38 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 8829 invoked by uid 500); 5 Sep 2012 19:40:38 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 8816 invoked by uid 99); 5 Sep 2012 19:40:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Sep 2012 19:40:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Sep 2012 19:40:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E7009238890D; Wed, 5 Sep 2012 19:39:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1381317 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ hadoop-yarn/hadoop-yarn-server... Date: Wed, 05 Sep 2012 19:39:53 -0000 To: yarn-commits@hadoop.apache.org From: bobby@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120905193953.E7009238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: bobby Date: Wed Sep 5 19:39:53 2012 New Revision: 1381317 URL: http://svn.apache.org/viewvc?rev=1381317&view=rev Log: YARN-68. NodeManager will refuse to shutdown indefinitely due to container log aggregation (daryn via bobby) Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1381317&r1=1381316&r2=1381317&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Sep 5 19:39:53 2012 @@ -111,3 +111,6 @@ Release 0.23.3 - Unreleased thus causes all containers to be rejected. (vinodkv) YARN-66. aggregated logs permissions not set properly (tgraves via bobby) + + YARN-68. NodeManager will refuse to shutdown indefinitely due to container + log aggregation (daryn via bobby) Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1381317&r1=1381316&r2=1381317&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed Sep 5 19:39:53 2012 @@ -26,7 +26,4 @@ public interface AppLogAggregator extend boolean wasContainerSuccessful); void finishLogAggregation(); - - void join(); - } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1381317&r1=1381316&r2=1381317&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Sep 5 19:39:53 2012 @@ -137,6 +137,9 @@ public class AppLogAggregatorImpl implem try { doAppLogAggregation(); } finally { + if (!this.appAggregationFinished.get()) { + LOG.warn("Aggregation did not complete for application " + appId); + } this.appAggregationFinished.set(true); } } @@ -155,6 +158,7 @@ public class AppLogAggregatorImpl implem } } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); + this.appFinishing.set(true); } } @@ -197,6 +201,7 @@ public class AppLogAggregatorImpl implem this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + this.appAggregationFinished.set(true); } private Path getRemoteNodeTmpLogFileForApp() { @@ -250,21 +255,4 @@ public class AppLogAggregatorImpl implem LOG.info("Application just finished : " + this.applicationId); this.appFinishing.set(true); } - - @Override - public void join() { - // Aggregation service is finishing - this.finishLogAggregation(); - - while (!this.appAggregationFinished.get()) { - LOG.info("Waiting for aggregation to complete for " - + this.applicationId); - try { - Thread.sleep(THREAD_SLEEP_TIME); - } catch (InterruptedException e) { - LOG.warn("Join interrupted. Some logs may not have been aggregated!!"); - break; - } - } - } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1381317&r1=1381316&r2=1381317&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Sep 5 19:39:53 2012 @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,8 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -137,11 +136,33 @@ public class LogAggregationService exten @Override public synchronized void stop() { LOG.info(this.getName() + " waiting for pending aggregation during exit"); - for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) { - appLogAggregator.join(); - } + stopAggregators(); super.stop(); } + + private void stopAggregators() { + threadPool.shutdown(); + // politely ask to finish + for (AppLogAggregator aggregator : appLogAggregators.values()) { + aggregator.finishLogAggregation(); + } + while (!threadPool.isTerminated()) { // wait for all threads to finish + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.info("Waiting for aggregation to complete for " + appId); + } + try { + if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); // send interrupt to hurry them along + } + } catch (InterruptedException e) { + LOG.warn("Aggregation stop interrupted!"); + break; + } + } + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.warn("Some logs may not have been aggregated for " + appId); + } + } private void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existance of the TLD @@ -293,10 +314,7 @@ public class LogAggregationService exten final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); if (credentials != null) { - for (Token token : credentials - .getAllTokens()) { - userUgi.addToken(token); - } + userUgi.addCredentials(credentials); } // New application @@ -312,9 +330,13 @@ public class LogAggregationService exten try { // Create the app dir createAppDir(user, appId, userUgi); - } catch (YarnException e) { + } catch (Exception e) { + appLogAggregators.remove(appId); closeFileSystems(userUgi); - throw e; + if (!(e instanceof YarnException)) { + e = new YarnException(e); + } + throw (YarnException)e; } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1381317&r1=1381316&r2=1381317&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Sep 5 19:39:53 2012 @@ -157,14 +157,18 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); // ensure filesystems were closed verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); + delSrvc.stop(); + String containerIdStr = ConverterUtils.toString(container11); File containerLogDir = new File(app1LogDir, containerIdStr); for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { - Assert.assertFalse(new File(containerLogDir, fileType).exists()); + File f = new File(containerLogDir, fileType); + Assert.assertFalse("check "+f, f.exists()); } Assert.assertFalse(app1LogDir.exists()); @@ -222,6 +226,7 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); Assert.assertFalse(new File(logAggregationService .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) @@ -356,6 +361,7 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, new ContainerId[] { container11, container12 }); @@ -454,7 +460,8 @@ public class TestLogAggregationService e ApplicationId appId = BuilderUtils.newApplicationId( System.currentTimeMillis(), (int)Math.random()); - doThrow(new YarnException("KABOOM!")) + Exception e = new RuntimeException("KABOOM!"); + doThrow(e) .when(logAggregationService).createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, @@ -463,7 +470,8 @@ public class TestLogAggregationService e dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ - new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!") + new ApplicationFinishEvent(appId, + "Application failed to init aggregation: "+e) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); @@ -479,6 +487,9 @@ public class TestLogAggregationService e logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } private void writeContainerLogs(File appLogDir, ContainerId containerId) @@ -690,6 +701,7 @@ public class TestLogAggregationService e ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } @Test