Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8097418599 for ; Mon, 12 Oct 2015 17:11:04 +0000 (UTC) Received: (qmail 98158 invoked by uid 500); 12 Oct 2015 17:10:44 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 98002 invoked by uid 500); 12 Oct 2015 17:10:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95892 invoked by uid 99); 12 Oct 2015 17:10:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Oct 2015 17:10:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29882E0593; Mon, 12 Oct 2015 17:10:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Mon, 12 Oct 2015 17:11:13 -0000 Message-Id: <5971de238a6b42a994b33fbdae27036b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [33/50] [abbrv] hadoop git commit: YARN-3045. Implement NM writing container lifecycle events to Timeline Service v2. Contributed by Naganarasimha G R. YARN-3045. Implement NM writing container lifecycle events to Timeline Service v2. Contributed by Naganarasimha G R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3fb80b9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3fb80b9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3fb80b9 Branch: refs/heads/YARN-2928-rebase Commit: d3fb80b950fca3ed387de1f06618e4124bb7c0f4 Parents: 0fd93f1 Author: Junping Du Authored: Tue Aug 18 04:31:45 2015 -0700 Committer: Sangjin Lee Committed: Sat Oct 10 16:13:21 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 16 +- .../distributedshell/TestDistributedShell.java | 28 +- .../hadoop/yarn/server/nodemanager/Context.java | 5 + .../yarn/server/nodemanager/NodeManager.java | 13 + .../containermanager/ContainerManagerImpl.java | 46 ++- .../ApplicationContainerFinishedEvent.java | 17 +- .../containermanager/container/Container.java | 3 + .../container/ContainerImpl.java | 30 +- .../monitor/ContainersMonitorImpl.java | 108 +----- .../timelineservice/NMTimelineEvent.java | 31 ++ .../timelineservice/NMTimelineEventType.java | 24 ++ .../timelineservice/NMTimelinePublisher.java | 376 +++++++++++++++++++ .../nodemanager/TestNodeStatusUpdater.java | 24 +- .../amrmproxy/BaseAMRMProxyTest.java | 9 + .../containermanager/TestAuxServices.java | 4 +- .../TestContainerManagerRecovery.java | 8 + .../application/TestApplication.java | 7 +- .../container/TestContainer.java | 2 +- .../nodemanager/webapp/MockContainer.java | 6 + .../nodemanager/webapp/TestNMWebServer.java | 3 +- .../PerNodeTimelineCollectorsAuxService.java | 16 +- ...TestPerNodeTimelineCollectorsAuxService.java | 9 + 23 files changed, 646 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 374b254..e5c609a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee) + YARN-3045. Implement NM writing container lifecycle events to Timeline + Service v2. (Naganarasimha G R via junping_du) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 691170e..62e60a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -110,6 +110,16 @@ + + + + + + + + + + @@ -505,10 +515,4 @@ - - - - - - http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index b8a7abf..c89bee9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; @@ -203,7 +204,7 @@ public class TestDistributedShell { testDSShell(false, "v2", false); } - public void testDSShell(boolean haveDomain, String timelineVersion, + private void testDSShell(boolean haveDomain, String timelineVersion, boolean defaultFlow) throws Exception { String[] args = { @@ -404,9 +405,32 @@ public class TestDistributedShell { "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_01_000001" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - verifyEntityTypeFileExists(basePath, + File containerEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_CONTAINER.toString(), containerMetricsTimestampFileName); + Assert.assertEquals( + "Container created event needs to be published atleast once", + 1, + getNumOfStringOccurences(containerEntityFile, + ContainerMetricsConstants.CREATED_EVENT_TYPE)); + + // to avoid race condition of testcase, atleast check 4 times with sleep + // of 500ms + long numOfContainerFinishedOccurences = 0; + for (int i = 0; i < 4; i++) { + numOfContainerFinishedOccurences = + getNumOfStringOccurences(containerEntityFile, + ContainerMetricsConstants.FINISHED_EVENT_TYPE); + if (numOfContainerFinishedOccurences > 0) { + break; + } else { + Thread.sleep(500l); + } + } + Assert.assertEquals( + "Container finished event needs to be published atleast once", + 1, + numOfContainerFinishedOccurences); // Verify RM posting Application life cycle Events are getting published String appMetricsTimestampFileName = http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 924860b..0b378a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; /** @@ -96,4 +97,8 @@ public interface Context { ConcurrentLinkedQueue getLogAggregationStatusForApps(); + + void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher); + + NMTimelinePublisher getNMTimelinePublisher(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f64d7fe..8a9d1fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -473,6 +474,8 @@ public class NodeManager extends CompositeService private final ConcurrentLinkedQueue logAggregationReportForApps; + private NMTimelinePublisher nmTimelinePublisher; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -615,6 +618,16 @@ public class NodeManager extends CompositeService Map newRegisteredCollectors) { this.registeredCollectors.putAll(newRegisteredCollectors); } + + @Override + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + this.nmTimelinePublisher = nmMetricsPublisher; + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return nmTimelinePublisher; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index da79446..12aab79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; @@ -142,6 +143,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -188,6 +190,8 @@ public class ContainerManagerImpl extends CompositeService implements private long waitForContainersOnShutdownMillis; + private final NMTimelinePublisher nmMetricsPublisher; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -214,6 +218,8 @@ public class ContainerManagerImpl extends CompositeService implements auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); + nmMetricsPublisher = createNMTimelinePublisher(context); + context.setNMTimelinePublisher(nmMetricsPublisher); this.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); @@ -222,13 +228,16 @@ public class ContainerManagerImpl extends CompositeService implements new ContainerEventDispatcher()); dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher()); - dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc, + nmMetricsPublisher)); dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); addService(dispatcher); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -351,7 +360,7 @@ public class ContainerManagerImpl extends CompositeService implements Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -413,6 +422,13 @@ public class ContainerManagerImpl extends CompositeService implements return new SharedCacheUploadService(); } + @VisibleForTesting + protected NMTimelinePublisher createNMTimelinePublisher(Context context) { + NMTimelinePublisher nmTimelinePublisherLocal = new NMTimelinePublisher(context); + addIfService(nmTimelinePublisherLocal); + return nmTimelinePublisherLocal; + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); @@ -910,7 +926,7 @@ public class ContainerManagerImpl extends CompositeService implements Container container = new ContainerImpl(getConfig(), this.dispatcher, context.getNMStateStore(), launchContext, - credentials, metrics, containerTokenIdentifier); + credentials, metrics, containerTokenIdentifier, context); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerId, container) != null) { @@ -952,9 +968,9 @@ public class ContainerManagerImpl extends CompositeService implements logAggregationContext)); } - this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); + this.context.getNMStateStore().storeContainer(containerId, request); this.context.getContainerTokenSecretManager().startContainerSuccessful( containerTokenIdentifier); @@ -1291,6 +1307,7 @@ public class ContainerManagerImpl extends CompositeService implements Container c = containers.get(event.getContainerID()); if (c != null) { c.handle(event); + nmMetricsPublisher.publishContainerEvent(event); } else { LOG.warn("Event " + event + " sent to absent container " + event.getContainerID()); @@ -1299,7 +1316,6 @@ public class ContainerManagerImpl extends CompositeService implements } class ApplicationEventDispatcher implements EventHandler { - @Override public void handle(ApplicationEvent event) { Application app = @@ -1307,6 +1323,7 @@ public class ContainerManagerImpl extends CompositeService implements event.getApplicationID()); if (app != null) { app.handle(event); + nmMetricsPublisher.publishApplicationEvent(event); } else { LOG.warn("Event " + event + " sent to absent application " + event.getApplicationID()); @@ -1314,6 +1331,25 @@ public class ContainerManagerImpl extends CompositeService implements } } + private static final class LocalizationEventHandlerWrapper implements + EventHandler { + + private EventHandler origLocalizationEventHandler; + private NMTimelinePublisher timelinePublisher; + + LocalizationEventHandlerWrapper(EventHandler handler, + NMTimelinePublisher publisher) { + this.origLocalizationEventHandler = handler; + this.timelinePublisher = publisher; + } + + @Override + public void handle(LocalizationEvent event) { + origLocalizationEventHandler.handle(event); + timelinePublisher.publishLocalizationEvent(event); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java index 6b8007f..9cd34cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java @@ -19,18 +19,23 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; public class ApplicationContainerFinishedEvent extends ApplicationEvent { - private ContainerId containerID; + private ContainerStatus containerStatus; - public ApplicationContainerFinishedEvent( - ContainerId containerID) { - super(containerID.getApplicationAttemptId().getApplicationId(), + public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) { + super(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(), ApplicationEventType.APPLICATION_CONTAINER_FINISHED); - this.containerID = containerID; + this.containerStatus = containerStatus; } public ContainerId getContainerID() { - return this.containerID; + return containerStatus.getContainerId(); } + + public ContainerStatus getContainerStatus() { + return containerStatus; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 1d2ec56..aac88fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -26,6 +26,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -57,4 +58,6 @@ public interface Container extends EventHandler { String toString(); + Priority getPriority(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index eff2188..fb5f451 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -44,12 +44,14 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -125,11 +128,12 @@ public class ContainerImpl implements Container { RecoveredContainerStatus.REQUESTED; // whether container was marked as killed after recovery private boolean recoveredAsKilled = false; + private Context context; public ContainerImpl(Configuration conf, Dispatcher dispatcher, NMStateStoreService stateStore, ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, - ContainerTokenIdentifier containerTokenIdentifier) { + ContainerTokenIdentifier containerTokenIdentifier, Context context) { this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = stateStore; @@ -144,8 +148,8 @@ public class ContainerImpl implements Container { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); - stateMachine = stateMachineFactory.make(this); + this.context = context; } // constructor for a recovered container @@ -154,9 +158,10 @@ public class ContainerImpl implements Container { Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability) { + String diagnostics, boolean wasKilled, Resource recoveredCapability, + Context context) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, - containerTokenIdentifier); + containerTokenIdentifier, context); this.recoveredStatus = recoveredStatus; this.exitCode = exitCode; this.recoveredAsKilled = wasKilled; @@ -372,6 +377,10 @@ public class ContainerImpl implements Container { } } + public NMTimelinePublisher getNMTimelinePublisher() { + return context.getNMTimelinePublisher(); + } + @Override public String getUser() { this.readLock.lock(); @@ -483,7 +492,10 @@ public class ContainerImpl implements Container { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); - eventHandler.handle(new ApplicationContainerFinishedEvent(containerId)); + + ContainerStatus containerStatus = cloneAndGetContainerStatus(); + eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + // Remove the container from the resource-monitor eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too @@ -985,7 +997,8 @@ public class ContainerImpl implements Container { public void transition(ContainerImpl container, ContainerEvent event) { container.metrics.releaseContainer(container.resource); container.sendFinishedEvents(); - //if the current state is NEW it means the CONTAINER_INIT was never + + // if the current state is NEW it means the CONTAINER_INIT was never // sent for the event, thus no need to send the CONTAINER_STOP if (container.getCurrentState() != org.apache.hadoop.yarn.api.records.ContainerState.NEW) { @@ -1171,4 +1184,9 @@ public class ContainerImpl implements Container { LocalResourceRequest resource) { return container.resourcesUploadPolicies.get(resource); } + + @Override + public Priority getPriority() { + return containerTokenIdentifier.getPriority(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 6e00e18..dfa32ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,13 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -36,25 +32,20 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainersMonitorImpl extends AbstractService implements ContainersMonitor { @@ -86,17 +77,11 @@ public class ContainersMonitorImpl extends AbstractService implements private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; private boolean containersMonitorEnabled; - - private boolean publishContainerMetricsToTimelineService; private long maxVCoresAllottedForContainers; private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; - - // For posting entities in new timeline service in a non-blocking way - // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool; @Private public static enum ContainerMetric { @@ -210,22 +195,6 @@ public class ContainersMonitorImpl extends AbstractService implements 1) + "). Thrashing might happen."); } } - - publishContainerMetricsToTimelineService = - YarnConfiguration.systemMetricsPublisherEnabled(conf); - - if (publishContainerMetricsToTimelineService) { - LOG.info("NodeManager has been configured to publish container " + - "metrics to Timeline Service V2."); - threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } else { - LOG.warn("NodeManager has not been configured to publish container " + - "metrics to Timeline Service V2."); - } - super.serviceInit(conf); } @@ -269,29 +238,8 @@ public class ContainersMonitorImpl extends AbstractService implements } } - shutdownAndAwaitTermination(); - super.serviceStop(); } - - // TODO remove threadPool after adding non-blocking call in TimelineClient - private static void shutdownAndAwaitTermination() { - if (threadPool == null) { - return; - } - threadPool.shutdown(); - try { - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } @VisibleForTesting static class ProcessTreeInfo { @@ -470,9 +418,6 @@ public class ContainersMonitorImpl extends AbstractService implements ContainerId containerId = entry.getKey(); ProcessTreeInfo ptInfo = entry.getValue(); - ContainerEntity entity = new ContainerEntity(); - entity.setId(containerId.toString()); - try { String pId = ptInfo.getPID(); @@ -556,26 +501,6 @@ public class ContainersMonitorImpl extends AbstractService implements containerMetricsUnregisterDelayMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } - - if (publishContainerMetricsToTimelineService) { - // if currentPmemUsage data is available - if (currentPmemUsage != - ResourceCalculatorProcessTree.UNAVAILABLE) { - TimelineMetric memoryMetric = new TimelineMetric(); - memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); - memoryMetric.addValue(currentTime, currentPmemUsage); - entity.addMetric(memoryMetric); - } - // if cpuUsageTotalCoresPercentage data is available - if (cpuUsageTotalCoresPercentage != - ResourceCalculatorProcessTree.UNAVAILABLE) { - TimelineMetric cpuMetric = new TimelineMetric(); - cpuMetric.setId(ContainerMetric.CPU.toString() + pId); - cpuMetric.addValue(currentTime, - cpuUsageTotalCoresPercentage); - entity.addMetric(cpuMetric); - } - } boolean isMemoryOverLimit = false; String msg = ""; @@ -632,23 +557,16 @@ public class ContainersMonitorImpl extends AbstractService implements LOG.info("Removed ProcessTree with root " + pId); } + ContainerImpl container = + (ContainerImpl) context.getContainers().get(containerId); + container.getNMTimelinePublisher().reportContainerResourceUsage( + container, currentTime, pId, currentPmemUsage, + cpuUsageTotalCoresPercentage); } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " + "while monitoring resource of " + containerId, e); } - - if (publishContainerMetricsToTimelineService) { - try { - TimelineClient timelineClient = context.getApplications().get( - containerId.getApplicationAttemptId().getApplicationId()). - getTimelineClient(); - putEntityWithoutBlocking(timelineClient, entity); - } catch (Exception e) { - LOG.error("Exception in ContainersMonitorImpl in putting " + - "resource usage metrics to timeline service.", e); - } - } } if (LOG.isDebugEnabled()) { LOG.debug("Total Resource Usage stats in NM by all containers : " @@ -671,20 +589,6 @@ public class ContainersMonitorImpl extends AbstractService implements } } } - - private void putEntityWithoutBlocking(final TimelineClient timelineClient, - final TimelineEntity entity) { - Runnable publishWrapper = new Runnable() { - public void run() { - try { - timelineClient.putEntities(entity); - } catch (IOException|YarnException e) { - LOG.error("putEntityNonBlocking get failed: " + e); - } - } - }; - threadPool.execute(publishWrapper); - } private String formatErrorMessage(String memTypeExceeded, long currentVmemUsage, long vmemLimit, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java new file mode 100644 index 0000000..af8d94c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NMTimelineEvent extends AbstractEvent { + public NMTimelineEvent(NMTimelineEventType type) { + super(type); + } + + public NMTimelineEvent(NMTimelineEventType type, long timestamp) { + super(type, timestamp); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java new file mode 100644 index 0000000..c1129af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +public enum NMTimelineEventType { + // Publish the NM Timeline entity + TIMELINE_ENTITY_PUBLISH, +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java new file mode 100644 index 0000000..2c5c300 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +public class NMTimelinePublisher extends CompositeService { + + private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class); + + private Dispatcher dispatcher; + private boolean publishSystemMetrics; + + private Context context; + + private NodeId nodeId; + + private String httpAddress; + + public NMTimelinePublisher(Context context) { + super(NMTimelinePublisher.class.getName()); + this.context = context; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + publishSystemMetrics = + YarnConfiguration.systemMetricsPublisherEnabled(conf); + + if (publishSystemMetrics) { + dispatcher = new AsyncDispatcher(); + dispatcher.register(NMTimelineEventType.class, + new ForwardingEventHandler()); + dispatcher + .register(ContainerEventType.class, new ContainerEventHandler()); + dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + dispatcher.register(LocalizationEventType.class, + new LocalizationEventDispatcher()); + addIfService(dispatcher); + LOG.info("YARN system metrics publishing service is enabled"); + } else { + LOG.info("YARN system metrics publishing service is not enabled"); + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // context will be updated after containerManagerImpl is started + // hence NMMetricsPublisher is added subservice of containerManagerImpl + this.nodeId = context.getNodeId(); + this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort(); + } + + protected void handleNMTimelineEvent(NMTimelineEvent event) { + switch (event.getType()) { + case TIMELINE_ENTITY_PUBLISH: + putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(), + ((TimelinePublishEvent) event).getApplicationId()); + break; + default: + LOG.error("Unknown NMTimelineEvent type: " + event.getType()); + } + } + + @SuppressWarnings("unchecked") + public void reportContainerResourceUsage(Container container, + long createdTime, String pId, Long pmemUsage, + Float cpuUsageTotalCoresPercentage) { + if (publishSystemMetrics + && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) { + ContainerEntity entity = + createContainerEntity(container.getContainerId()); + long currentTimeMillis = System.currentTimeMillis(); + if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric memoryMetric = new TimelineMetric(); + memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); + memoryMetric.addValue(currentTimeMillis, pmemUsage); + entity.addMetric(memoryMetric); + } + if (cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE) { + TimelineMetric cpuMetric = new TimelineMetric(); + cpuMetric.setId(ContainerMetric.CPU.toString() + pId); + cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage); + entity.addMetric(cpuMetric); + } + dispatcher.getEventHandler().handle( + new TimelinePublishEvent(entity, container.getContainerId() + .getApplicationAttemptId().getApplicationId())); + } + } + + private void publishContainerCreatedEvent(ContainerEntity entity, + ContainerId containerId, Resource resource, Priority priority, + long timestamp) { + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + resource.getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + resource.getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + nodeId.getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + nodeId.getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + priority.toString()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + httpAddress); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(timestamp); + + entity.addEvent(tEvent); + putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + } + + private void publishContainerFinishedEvent(ContainerStatus containerStatus, + long timeStamp) { + ContainerId containerId = containerStatus.getContainerId(); + TimelineEntity entity = createContainerEntity(containerId); + + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + containerStatus.getDiagnostics()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + containerStatus.getExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus + .getState().toString()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(timeStamp); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, containerId.getApplicationAttemptId().getApplicationId()); + } + + private static ContainerEntity createContainerEntity(ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + Identifier parentIdentifier = new Identifier(); + parentIdentifier + .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name()); + parentIdentifier.setId(containerId.getApplicationAttemptId().toString()); + entity.setParent(parentIdentifier); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineClient timelineClient = + context.getApplications().get(appId).getTimelineClient(); + timelineClient.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + public void publishApplicationEvent(ApplicationEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case INIT_APPLICATION: + case FINISH_APPLICATION: + case APPLICATION_CONTAINER_FINISHED: + case APPLICATION_LOG_HANDLING_FAILED: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired ApplicationEvent which needs to be published by" + + " NMTimelinePublisher"); + } + break; + } + } + + public void publishContainerEvent(ContainerEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case INIT_CONTAINER: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired ContainerEvent which needs to be published by" + + " NMTimelinePublisher"); + } + break; + } + } + + public void publishLocalizationEvent(LocalizationEvent event) { + if (!publishSystemMetrics) { + return; + } + // publish only when the desired event is received + switch (event.getType()) { + case CONTAINER_RESOURCES_LOCALIZED: + case INIT_CONTAINER_RESOURCES: + dispatcher.getEventHandler().handle(event); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug(event.getType() + + " is not a desired LocalizationEvent which needs to be published" + + " by NMTimelinePublisher"); + } + break; + } + } + + private class ApplicationEventHandler implements + EventHandler { + @Override + public void handle(ApplicationEvent event) { + switch (event.getType()) { + case APPLICATION_CONTAINER_FINISHED: + // this is actually used to publish the container Event + ApplicationContainerFinishedEvent evnt = + (ApplicationContainerFinishedEvent) event; + publishContainerFinishedEvent(evnt.getContainerStatus(), + event.getTimestamp()); + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishApplicationEvent method and not handled here"); + break; + } + } + } + + private class ContainerEventHandler implements EventHandler { + @Override + public void handle(ContainerEvent event) { + ContainerId containerId = event.getContainerID(); + Container container = context.getContainers().get(containerId); + long timestamp = event.getTimestamp(); + ContainerEntity entity = createContainerEntity(containerId); + + switch (event.getType()) { + case INIT_CONTAINER: + publishContainerCreatedEvent(entity, containerId, + container.getResource(), container.getPriority(), timestamp); + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishContainerEvent method and not handled here"); + break; + } + } + } + + private static final class LocalizationEventDispatcher implements + EventHandler { + @Override + public void handle(LocalizationEvent event) { + switch (event.getType()) { + case INIT_CONTAINER_RESOURCES: + case CONTAINER_RESOURCES_LOCALIZED: + // TODO after priority based flush jira is finished + break; + default: + LOG.error("Seems like event type is captured only in " + + "publishLocalizationEvent method and not handled here"); + break; + } + } + } + + /** + * EventHandler implementation which forward events to NMMetricsPublisher. + * Making use of it, NMMetricsPublisher can avoid to have a public handle + * method. + */ + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(NMTimelineEvent event) { + handleNMTimelineEvent(event); + } + } + + private static class TimelinePublishEvent extends NMTimelineEvent { + private ApplicationId appId; + private TimelineEntity entityToPublish; + + public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) { + super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System + .currentTimeMillis()); + this.appId = appId; + this.entityToPublish = entity; + } + + public ApplicationId getApplicationId() { + return appId; + } + + public TimelineEntity getTimelineEntityToPublish() { + return entityToPublish; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 47f6df5..98bb827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -250,8 +251,10 @@ public class TestNodeStatusUpdater { firstContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = new ContainerImpl(conf, mockDispatcher, - stateStore, launchContext, null, mockMetrics, containerToken); + Container container = + new ContainerImpl(conf, mockDispatcher, stateStore, launchContext, + null, mockMetrics, containerToken, + mock(Context.class)); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -289,8 +292,10 @@ public class TestNodeStatusUpdater { secondContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = new ContainerImpl(conf, mockDispatcher, - stateStore, launchContext, null, mockMetrics, containerToken); + Container container = + new ContainerImpl(conf, mockDispatcher, stateStore, launchContext, + null, mockMetrics, containerToken, + mock(Context.class)); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -977,7 +982,8 @@ public class TestNodeStatusUpdater { "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, null, null, null, null, - BuilderUtils.newContainerTokenIdentifier(containerToken)) { + BuilderUtils.newContainerTokenIdentifier(containerToken), + mock(Context.class)) { @Override public ContainerState getCurrentState() { @@ -998,7 +1004,7 @@ public class TestNodeStatusUpdater { "password".getBytes(), 0); Container runningContainer = new ContainerImpl(conf, null, null, null, null, null, - BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) { + BuilderUtils.newContainerTokenIdentifier(runningContainerToken), mock(Context.class)) { @Override public ContainerState getCurrentState() { return ContainerState.RUNNING; @@ -1054,9 +1060,10 @@ public class TestNodeStatusUpdater { BuilderUtils.newContainerToken(containerId, "host", 1234, "user", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); + Container completedContainer = new ContainerImpl(conf, null, null, null, null, null, - BuilderUtils.newContainerTokenIdentifier(containerToken)) { + BuilderUtils.newContainerTokenIdentifier(containerToken),mock(Context.class)) { @Override public ContainerState getCurrentState() { return ContainerState.COMPLETE; @@ -1093,7 +1100,8 @@ public class TestNodeStatusUpdater { "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, null, null, null, null, - BuilderUtils.newContainerTokenIdentifier(containerToken)) { + BuilderUtils.newContainerTokenIdentifier(containerToken), + mock(Context.class)) { @Override public ContainerState getCurrentState() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index bf4fd52..35f2d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -688,5 +689,13 @@ public abstract class BaseAMRMProxyTest { return null; } + @Override + public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) { + } + + @Override + public NMTimelinePublisher getNMTimelinePublisher() { + return null; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 1380752..8c4bd25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.junit.Assert; @@ -193,7 +195,7 @@ public class TestAuxServices { ContainerId.newContainerId(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); Container container = new ContainerImpl(null, null, null, null, null, - null, cti); + null, cti, mock(Context.class)); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 31f4702..8b1ef67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -100,10 +100,12 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestContainerManagerRecovery extends BaseContainerManagerTest { @@ -633,6 +635,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { boolean blockNewContainerRequests) { // do nothing } + + @Override + public NMTimelinePublisher createNMTimelinePublisher(Context context) { + NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class); + return timelinePublisher; + } }; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 002d4cf..38b3172f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -579,7 +582,7 @@ public class TestApplication { public void containerFinished(int containerNum) { app.handle(new ApplicationContainerFinishedEvent(containers.get( - containerNum).getContainerId())); + containerNum).cloneAndGetContainerStatus())); drainDispatcherEvents(); } @@ -616,6 +619,8 @@ public class TestApplication { when(c.getLaunchContext()).thenReturn(launchContext); when(launchContext.getApplicationACLs()).thenReturn( new HashMap()); + when(c.cloneAndGetContainerStatus()).thenReturn(BuilderUtils.newContainerStatus(cId, + ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); return c; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2834e30..aa22e5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -808,7 +808,7 @@ public class TestContainer { when(ctxt.getServiceData()).thenReturn(serviceData); c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), - ctxt, null, metrics, identifier); + ctxt, null, metrics, identifier, mock(Context.class)); dispatcher.register(ContainerEventType.class, new EventHandler() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92c..37c726f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -144,4 +145,9 @@ public class MockContainer implements Container { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public Priority getPriority() { + return Priority.UNDEFINED; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index ed94fb6..e839f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -220,7 +220,8 @@ public class TestNMWebServer { Container container = new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, - BuilderUtils.newContainerTokenIdentifier(containerToken)) { + BuilderUtils.newContainerTokenIdentifier(containerToken), + mock(Context.class)) { @Override public ContainerState getContainerState() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index befaa83..4147d42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -137,9 +137,19 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { // intercept the event of the AM container being stopped and remove the app // level collector service if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { - ApplicationId appId = context.getContainerId(). - getApplicationAttemptId().getApplicationId(); - removeApplication(appId); + final ApplicationId appId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); + new Thread(new Runnable() { + public void run() { + try { + // TODO Temporary Fix until solution for YARN-3995 is finalized. + Thread.sleep(1000l); + } catch (InterruptedException e) { + e.printStackTrace(); + } + removeApplication(appId); + } + }).start(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fb80b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index 7cc612d..dafc76e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -98,6 +98,15 @@ public class TestPerNodeTimelineCollectorsAuxService { when(context.getContainerType()).thenReturn( ContainerType.APPLICATION_MASTER); auxService.stopContainer(context); + + // TODO Temporary Fix until solution for YARN-3995 is finalized + for (int i = 0; i < 4; i++) { + Thread.sleep(500l); + if (!auxService.hasApplication(appAttemptId.getApplicationId())) { + break; + } + } + // auxService should not have that app assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close();