From common-commits-return-86392-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu Aug 2 19:17:33 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EA4C71807A1 for ; Thu, 2 Aug 2018 19:17:29 +0200 (CEST) Received: (qmail 24413 invoked by uid 500); 2 Aug 2018 17:17:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 22962 invoked by uid 99); 2 Aug 2018 17:17:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Aug 2018 17:17:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C891BE1193; Thu, 2 Aug 2018 17:17:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: botong@apache.org To: common-commits@hadoop.apache.org Date: Thu, 02 Aug 2018 17:17:46 -0000 Message-Id: <412da1cd712f4834a69b4e2f62bb8c86@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] hadoop git commit: YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda) YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda) Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b540bbf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b540bbf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b540bbf Branch: refs/heads/YARN-7402 Commit: 4b540bbfcf02d828052999215c6135603d98f5db Parents: 8aa93a5 Author: Wangda Tan Authored: Tue Jul 31 12:07:51 2018 -0700 Committer: Wangda Tan Committed: Tue Jul 31 12:08:00 2018 -0700 ---------------------------------------------------------------------- .../LogAggregationFileController.java | 7 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 1 + .../containermanager/ContainerManager.java | 1 + .../containermanager/ContainerManagerImpl.java | 13 ++- .../logaggregation/AppLogAggregator.java | 8 ++ .../logaggregation/AppLogAggregatorImpl.java | 15 ++++ .../logaggregation/LogAggregationService.java | 83 ++++++++++++++++---- .../containermanager/loghandler/LogHandler.java | 7 ++ .../loghandler/NonAggregatingLogHandler.java | 9 +++ .../loghandler/event/LogHandlerEventType.java | 4 +- .../event/LogHandlerTokenUpdatedEvent.java | 26 ++++++ .../nodemanager/DummyContainerManager.java | 7 ++ .../TestLogAggregationService.java | 34 +++++--- 13 files changed, 187 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index b047b1c..6b3c9a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -43,11 +43,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.webapp.View.ViewContext; @@ -365,6 +368,10 @@ public abstract class LogAggregationFileController { } }); } catch (Exception e) { + if (e instanceof RemoteException) { + throw new YarnRuntimeException(((RemoteException) e) + .unwrapRemoteException(SecretManager.InvalidToken.class)); + } throw new YarnRuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8154723..faf7adb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1135,6 +1135,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( parseCredentials(systemCredentials)); + context.getContainerManager().handleCredentialUpdate(); } List containersToUpdate = response.getContainersToUpdate(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java index 2aeb245..356c2e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -44,4 +44,5 @@ public interface ContainerManager extends ServiceStateChangeListener, ContainerScheduler getContainerScheduler(); + void handleCredentialUpdate(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ce240bc..8b35258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,7 +171,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -214,6 +214,7 @@ public class ContainerManagerImpl extends CompositeService implements protected final AsyncDispatcher dispatcher; private final DeletionService deletionService; + private LogHandler logHandler; private boolean serviceStopped = false; private final ReadLock readLock; private final WriteLock writeLock; @@ -292,7 +293,7 @@ public class ContainerManagerImpl extends CompositeService implements @Override public void serviceInit(Configuration conf) throws Exception { - LogHandler logHandler = + logHandler = createLogHandler(conf, this.context, this.deletionService); addIfService(logHandler); dispatcher.register(LogHandlerEventType.class, logHandler); @@ -1904,4 +1905,12 @@ public class ContainerManagerImpl extends CompositeService implements public ContainerScheduler getContainerScheduler() { return this.containerScheduler; } + + @Override + public void handleCredentialUpdate() { + Set invalidApps = logHandler.getInvalidTokenApps(); + if (!invalidApps.isEmpty()) { + dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 0178699..93436fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.api.ContainerLogContext; public interface AppLogAggregator extends Runnable { @@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable { void finishLogAggregation(); void disableLogAggregation(); + + void enableLogAggregation(); + + boolean isAggregationEnabled(); + + UserGroupInformation updateCredentials(Credentials cred); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6630ba6..04503ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -561,6 +561,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.logAggregationDisabled = true; } + @Override + public void enableLogAggregation() { + this.logAggregationDisabled = false; + } + + @Override + public boolean isAggregationEnabled() { + return !logAggregationDisabled; + } + @Private @VisibleForTesting // This is only used for testing. @@ -643,6 +653,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return this.userUgi; } + public UserGroupInformation updateCredentials(Credentials cred) { + this.userUgi.addCredentials(cred); + return userUgi; + } + @Private @VisibleForTesting public int getLogAggregationTimes() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index dcc165f..d8db967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.security.token.SecretManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService implements private final ConcurrentMap appLogAggregators; + // Holds applications whose aggregation is disable due to invalid Token + private final Set invalidTokenApps; + @VisibleForTesting ExecutorService threadPool; @@ -95,6 +103,7 @@ public class LogAggregationService extends AbstractService implements this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); + this.invalidTokenApps = ConcurrentHashMap.newKeySet(); } protected void serviceInit(Configuration conf) throws Exception { @@ -224,8 +233,8 @@ public class LogAggregationService extends AbstractService implements userUgi.addCredentials(credentials); } - LogAggregationFileController logAggregationFileController - = getLogAggregationFileController(getConfig()); + LogAggregationFileController logAggregationFileController = + getLogAggregationFileController(getConfig()); logAggregationFileController.verifyAndCreateRemoteLogDir(); // New application final AppLogAggregator appLogAggregator = @@ -245,14 +254,16 @@ public class LogAggregationService extends AbstractService implements logAggregationFileController.createAppDir(user, appId, userUgi); } catch (Exception e) { appLogAggregator.disableLogAggregation(); + + // add to disabled aggregators if due to InvalidToken + if (e.getCause() instanceof SecretManager.InvalidToken) { + invalidTokenApps.add(appId); + } if (!(e instanceof YarnRuntimeException)) { appDirException = new YarnRuntimeException(e); } else { appDirException = (YarnRuntimeException)e; } - appLogAggregators.remove(appId); - closeFileSystems(userUgi); - throw appDirException; } // TODO Get the user configuration for the list of containers that need log @@ -270,6 +281,10 @@ public class LogAggregationService extends AbstractService implements } }; this.threadPool.execute(aggregatorWrapper); + + if (appDirException != null) { + throw appDirException; + } } protected void closeFileSystems(final UserGroupInformation userUgi) { @@ -307,17 +322,20 @@ public class LogAggregationService extends AbstractService implements // App is complete. Finish up any containers' pending log aggregation and // close the application specific logFile. - - AppLogAggregator aggregator = this.appLogAggregators.get(appId); - if (aggregator == null) { - LOG.warn("Log aggregation is not initialized for " + appId - + ", did it fail to start?"); - this.dispatcher.getEventHandler().handle( - new ApplicationEvent(appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); - return; + try { + AppLogAggregator aggregator = this.appLogAggregators.get(appId); + if (aggregator == null) { + LOG.warn("Log aggregation is not initialized for " + appId + + ", did it fail to start?"); + this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); + return; + } + aggregator.finishLogAggregation(); + } finally { + // Remove invalid Token Apps + invalidTokenApps.remove(appId); } - aggregator.finishLogAggregation(); } @Override @@ -344,12 +362,47 @@ public class LogAggregationService extends AbstractService implements (LogHandlerAppFinishedEvent) event; stopApp(appFinishedEvent.getApplicationId()); break; + case LOG_AGG_TOKEN_UPDATE: + checkAndEnableAppAggregators(); + break; default: ; // Ignore } } + private void checkAndEnableAppAggregators() { + for (ApplicationId appId : invalidTokenApps) { + try { + AppLogAggregator aggregator = appLogAggregators.get(appId); + if (aggregator != null) { + Credentials credentials = + context.getSystemCredentialsForApps().get(appId); + if (credentials != null) { + // Create the app dir again with + LogAggregationFileController logAggregationFileController = + getLogAggregationFileController(getConfig()); + UserGroupInformation userUgi = + aggregator.updateCredentials(credentials); + logAggregationFileController + .createAppDir(userUgi.getShortUserName(), appId, userUgi); + aggregator.enableLogAggregation(); + } + invalidTokenApps.remove(appId); + LOG.info("LogAggregation enabled for application {}", appId); + } + } catch (Exception e) { + //Ignore exception + LOG.warn("Enable aggregators failed {}", appId); + } + } + } + + @Override + public Set getInvalidTokenApps() { + return invalidTokenApps; + } + @VisibleForTesting public ConcurrentMap getAppLogAggregators() { return this.appLogAggregators; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java index 6eb3fb4..459fdf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java @@ -18,9 +18,16 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + + +import java.util.Set; + public interface LogHandler extends EventHandler { public void handle(LogHandlerEvent event); + + public Set getInvalidTokenApps(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 9c43dde..d66aa12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -204,6 +208,11 @@ public class NonAggregatingLogHandler extends AbstractService implements } } + @Override + public Set getInvalidTokenApps() { + return Collections.emptySet(); + } + ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( Configuration conf) { ThreadFactory tf = http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java index 684d6b2..ec477c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java @@ -19,5 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; public enum LogHandlerEventType { - APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED + APPLICATION_STARTED, + CONTAINER_FINISHED, + APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java new file mode 100644 index 0000000..772a463 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; + +public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent { + + public LogHandlerTokenUpdatedEvent() { + super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index b5cb43b..feabeb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -187,6 +189,11 @@ public class DummyContainerManager extends ContainerManagerImpl { // Ignore } } + + @Override + public Set getInvalidTokenApps() { + return Collections.emptySet(); + } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6268ad9..8b2e3cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -73,6 +73,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -128,6 +129,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -823,7 +825,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { .getFileControllerForWrite(); LogAggregationFileController spyLogAggregationFileFormat = spy(logAggregationFileFormat); - Exception e = new RuntimeException("KABOOM!"); + Exception e = + new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!")); doThrow(e).when(spyLogAggregationFileFormat) .createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); @@ -862,29 +865,40 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - + Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( BuilderUtils.newContainerId(4, 1, 1, 1), ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); + + AppLogAggregator appAgg = + logAggregationService.getAppLogAggregators().get(appId); + Assert.assertFalse("Aggregation should be disabled", + appAgg.isAggregationEnabled()); + + // Enabled aggregation + logAggregationService.handle(new LogHandlerTokenUpdatedEvent()); + dispatcher.await(); + + appAgg = + logAggregationService.getAppLogAggregators().get(appId); + Assert.assertFalse("Aggregation should be enabled", + appAgg.isAggregationEnabled()); + + // Check disabled apps are cleared + Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size()); + logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - // local log dir shouldn't be deleted given log aggregation cannot - // continue due to aggregated log dir creation failure on remoteFS. - FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user, - null, null); - verify(spyDelSrvc, never()).delete(deletionTask); + verify(spyDelSrvc).delete(any(FileDeletionTask.class)); verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); - // make sure local log dir is not deleted in case log aggregation - // service cannot be initiated. - assertTrue(appLogDir.exists()); } private void writeContainerLogs(File appLogDir, ContainerId containerId, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org