Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 000021857A for ; Mon, 12 Oct 2015 17:10:50 +0000 (UTC) Received: (qmail 96225 invoked by uid 500); 12 Oct 2015 17:10:42 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95929 invoked by uid 500); 12 Oct 2015 17:10:42 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95161 invoked by uid 99); 12 Oct 2015 17:10:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Oct 2015 17:10:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCE54E045B; Mon, 12 Oct 2015 17:10:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Mon, 12 Oct 2015 17:10:54 -0000 Message-Id: <747d9f3c73a14c7cbd86501185d04d6a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/50] [abbrv] hadoop git commit: YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R. YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d35d861c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d35d861c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d35d861c Branch: refs/heads/YARN-2928-rebase Commit: d35d861c7990e7680a32f1123c8a97ed932d45a4 Parents: ec0fd03 Author: Zhijie Shen Authored: Sat Jun 13 11:32:41 2015 -0700 Committer: Sangjin Lee Committed: Sat Oct 10 15:45:29 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../records/timelineservice/TimelineEntity.java | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../distributedshell/TestDistributedShell.java | 118 +++-- .../metrics/ContainerMetricsConstants.java | 10 + .../server/resourcemanager/ResourceManager.java | 2 +- .../AbstractTimelineServicePublisher.java | 184 ++++++++ .../metrics/ApplicationFinishedEvent.java | 13 +- .../metrics/SystemMetricsPublisher.java | 458 ++++--------------- .../metrics/TimelineServiceV1Publisher.java | 286 ++++++++++++ .../metrics/TimelineServiceV2Publisher.java | 318 +++++++++++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 2 - .../TestRMAppLogAggregationStatus.java | 2 +- .../metrics/TestSystemMetricsPublisher.java | 5 +- .../TestSystemMetricsPublisherForV2.java | 374 +++++++++++++++ .../storage/FileSystemTimelineWriterImpl.java | 4 +- 16 files changed, 1369 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8ff151a..22860ea 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3721. build is broken on YARN-2928 branch due to possible dependency cycle (Li Lu via sjlee) + YARN-3044. Made RM write app, attempt and optional container lifecycle + events to timeline service v2. (Naganarasimha G R via zjshen) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index defadec..a641f32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -470,4 +470,7 @@ public class TimelineEntity { return real == null ? this : real; } + public String toString() { + return identifier.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 348eadd..524203b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -392,6 +392,16 @@ public class YarnConfiguration extends Configuration { + "system-metrics-publisher.enabled"; public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false; + /** + * The setting that controls whether yarn container metrics is published to + * the timeline server or not by RM. This configuration setting is for ATS + * V2 + */ + public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX + + "rm.system-metrics-publisher.emit-container-events"; + public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED = + false; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size"; public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index cc5f5e2..d31f2c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -52,13 +52,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -107,6 +108,7 @@ public class TestDistributedShell { conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); } + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); // Enable ContainersMonitorImpl @@ -373,48 +375,96 @@ public class TestDistributedShell { "/1/1/" : "/test_flow_name/test_flow_version/12345678/") + appId.toString(); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = basePath + "/DS_APP_ATTEMPT/"; - - File entityFolder = new File(outputDirApp); - Assert.assertTrue(entityFolder.isDirectory()); + // Verify DS_APP_ATTEMPT entities posted by the client // there will be at least one attempt, look for that file - String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() - + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - String appAttemptFileName = outputDirApp + appTimestampFileName; - File appAttemptFile = new File(appAttemptFileName); - Assert.assertTrue(appAttemptFile.exists()); - - String outputDirContainer = basePath + "/DS_CONTAINER/"; - File containerFolder = new File(outputDirContainer); - Assert.assertTrue(containerFolder.isDirectory()); - - String containerTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000002.thist"; - String containerFileName = outputDirContainer + containerTimestampFileName; - File containerFile = new File(containerFileName); - Assert.assertTrue(containerFile.exists()); + String appTimestampFileName = + "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT", + appTimestampFileName); + + // Verify DS_CONTAINER entities posted by the client + String containerTimestampFileName = + "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000002.thist"; + verifyEntityTypeFileExists(basePath, "DS_CONTAINER", + containerTimestampFileName); // Verify NM posting container metrics info. - String outputDirContainerMetrics = basePath + "/" + - TimelineEntityType.YARN_CONTAINER + "/"; - File containerMetricsFolder = new File(outputDirContainerMetrics); - Assert.assertTrue(containerMetricsFolder.isDirectory()); + String containerMetricsTimestampFileName = + "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_CONTAINER.toString(), + containerMetricsTimestampFileName); + + // Verify RM posting Application life cycle Events are getting published + String appMetricsTimestampFileName = + "application_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION.toString(), + appMetricsTimestampFileName); + verifyStringExistsSpecifiedTimes(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, + "Application created event should be published atleast once"); + verifyStringExistsSpecifiedTimes(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, + "Application finished event should be published atleast once"); + + // Verify RM posting AppAttempt life cycle Events are getting published + String appAttemptMetricsTimestampFileName = + "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appAttemptEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), + appAttemptMetricsTimestampFileName); + verifyStringExistsSpecifiedTimes(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, + "AppAttempt register event should be published atleast once"); + verifyStringExistsSpecifiedTimes(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, + "AppAttempt finished event should be published atleast once"); + } finally { + FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); + } + } - String containerMetricsTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000001.thist"; - String containerMetricsFileName = outputDirContainerMetrics + - containerMetricsTimestampFileName; + private File verifyEntityTypeFileExists(String basePath, String entityType, + String entityfileName) { + String outputDirPathForEntity = basePath + "/" + entityType + "/"; + File outputDirForEntity = new File(outputDirPathForEntity); + Assert.assertTrue(outputDirForEntity.isDirectory()); - File containerMetricsFile = new File(containerMetricsFileName); - Assert.assertTrue(containerMetricsFile.exists()); + String entityFilePath = outputDirPathForEntity + entityfileName; + File entityFile = new File(entityFilePath); + Assert.assertTrue(entityFile.exists()); + return entityFile; + } + + private void verifyStringExistsSpecifiedTimes(File entityFile, + String searchString, long expectedNumOfTimes, String errorMsg) + throws IOException { + BufferedReader reader = null; + String strLine; + long actualCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().contains(searchString)) + actualCount++; + } } finally { - FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); + reader.close(); } + Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 0d5540d..7b42994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -27,10 +27,20 @@ public class ContainerMetricsConstants { public static final String ENTITY_TYPE = "YARN_CONTAINER"; + // Event of this type will be emitted by NM. public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED"; + // Event of this type will be emitted by RM. + public static final String CREATED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_CREATED"; + + // Event of this type will be emitted by NM. public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; + // Event of this type will be emitted by RM. + public static final String FINISHED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_FINISHED"; + public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT"; public static final String ALLOCATED_MEMORY_ENTITY_INFO = http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 29262ea..93049f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -373,7 +373,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - return new SystemMetricsPublisher(); + return new SystemMetricsPublisher(rmContext); } // sanity check for configurations http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java new file mode 100644 index 0000000..12145267 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher; + +public abstract class AbstractTimelineServicePublisher extends CompositeService + implements TimelineServicePublisher, EventHandler { + + private static final Log LOG = LogFactory + .getLog(TimelineServiceV2Publisher.class); + + private Configuration conf; + + public AbstractTimelineServicePublisher(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public void handle(SystemMetricsEvent event) { + switch (event.getType()) { + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_UPDATED: + publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); + break; + case APP_ACLS_UPDATED: + publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + break; + default: + LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); + } + } + + abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event); + + abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event); + + abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event); + + abstract void publishApplicationACLsUpdatedEvent( + ApplicationACLsUpdatedEvent event); + + abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event); + + abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event); + + abstract void publishContainerCreatedEvent(ContainerCreatedEvent event); + + abstract void publishContainerFinishedEvent(ContainerFinishedEvent event); + + @Override + public Dispatcher getDispatcher() { + MultiThreadedDispatcher dispatcher = + new MultiThreadedDispatcher( + conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + @Override + public boolean publishRMContainerMetrics() { + return true; + } + + @Override + public EventHandler getEventHandler() { + return this; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadedDispatcher extends CompositeService + implements Dispatcher { + + private List dispatchers = + new ArrayList(); + + public MultiThreadedDispatcher(int num) { + super(MultiThreadedDispatcher.class.getName()); + for (int i = 0; i < num; ++i) { + AsyncDispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + } + + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java index 8d75f92..d9241b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; public class ApplicationFinishedEvent extends @@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends private YarnApplicationState state; private ApplicationAttemptId latestAppAttemptId; private RMAppMetrics appMetrics; + private RMAppImpl app; public ApplicationFinishedEvent( ApplicationId appId, @@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends YarnApplicationState state, ApplicationAttemptId latestAppAttemptId, long finishedTime, - RMAppMetrics appMetrics) { + RMAppMetrics appMetrics, + RMAppImpl app) { super(SystemMetricsEventType.APP_FINISHED, finishedTime); this.appId = appId; this.diagnosticsInfo = diagnosticsInfo; this.appStatus = appStatus; this.latestAppAttemptId = latestAppAttemptId; this.state = state; - this.appMetrics=appMetrics; + this.appMetrics = appMetrics; + this.app = app; } @Override @@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends return appId.hashCode(); } + public RMAppImpl getApp() { + return app; + } + public ApplicationId getApplicationId() { return appId; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 2f56bd7..6ab7838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -18,10 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,33 +25,24 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.annotations.VisibleForTesting; /** - * The class that helps RM publish metrics to the timeline server V1. RM will + * The class that helps RM publish metrics to the timeline server. RM will * always invoke the methods of this class regardless the service is enabled or * not. If it is disabled, publishing requests will be ignored silently. */ @@ -67,30 +54,38 @@ public class SystemMetricsPublisher extends CompositeService { .getLog(SystemMetricsPublisher.class); private Dispatcher dispatcher; - private TimelineClient client; - private boolean publishSystemMetricsToATSv1; + private boolean publishSystemMetrics; + private boolean publishContainerMetrics; + protected RMContext rmContext; - public SystemMetricsPublisher() { + public SystemMetricsPublisher(RMContext rmContext) { super(SystemMetricsPublisher.class.getName()); + this.rmContext = rmContext; } @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsToATSv1 = + publishSystemMetrics = conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsToATSv1) { - client = TimelineClient.createTimelineClient(); - addIfService(client); - - dispatcher = createDispatcher(conf); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - addIfService(dispatcher); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + if (publishSystemMetrics) { + TimelineServicePublisher timelineServicePublisher = + getTimelineServicePublisher(conf); + if (timelineServicePublisher != null) { + addService(timelineServicePublisher); + // init required to be called so that other methods of + // TimelineServicePublisher can be utilized + timelineServicePublisher.init(conf); + dispatcher = createDispatcher(timelineServicePublisher); + publishContainerMetrics = + timelineServicePublisher.publishRMContainerMetrics(); + dispatcher.register(SystemMetricsEventType.class, + timelineServicePublisher.getEventHandler()); + addIfService(dispatcher); + } else { + LOG.info("TimelineServicePublisher is not configured"); + publishSystemMetrics = false; + } LOG.info("YARN system metrics publishing service is enabled"); } else { LOG.info("YARN system metrics publishing service is not enabled"); @@ -98,9 +93,26 @@ public class SystemMetricsPublisher extends CompositeService { super.serviceInit(conf); } + @VisibleForTesting + Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) { + return timelineServicePublisher.getDispatcher(); + } + + TimelineServicePublisher getTimelineServicePublisher(Configuration conf) { + if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new TimelineServiceV1Publisher(); + } else if (conf.getBoolean( + YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new TimelineServiceV2Publisher(rmContext); + } + return null; + } + @SuppressWarnings("unchecked") public void appCreated(RMApp app, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { ApplicationSubmissionContext appSubmissionContext = app.getApplicationSubmissionContext(); dispatcher.getEventHandler().handle( @@ -121,7 +133,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appUpdated(RMApp app, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler() .handle( new ApplicationUpdatedEvent(app.getApplicationId(), app @@ -132,7 +144,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appFinished(RMApp app, RMAppState state, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationFinishedEvent( app.getApplicationId(), @@ -142,14 +154,15 @@ public class SystemMetricsPublisher extends CompositeService { app.getCurrentAppAttempt() == null ? null : app.getCurrentAppAttempt().getAppAttemptId(), finishedTime, - app.getRMAppMetrics())); + app.getRMAppMetrics(), + (RMAppImpl)app)); } } @SuppressWarnings("unchecked") public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationACLsUpdatedEvent( app.getApplicationId(), @@ -161,7 +174,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new AppAttemptRegisteredEvent( appAttempt.getAppAttemptId(), @@ -177,7 +190,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new AppAttemptFinishedEvent( appAttempt.getAppAttemptId(), @@ -194,7 +207,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void containerCreated(RMContainer container, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerCreatedEvent( container.getContainerId(), @@ -207,7 +220,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void containerFinished(RMContainer container, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerFinishedEvent( container.getContainerId(), @@ -218,344 +231,31 @@ public class SystemMetricsPublisher extends CompositeService { } } - protected Dispatcher createDispatcher(Configuration conf) { - MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); - dispatcher.setDrainEventsOnStop(); - return dispatcher; - } - - protected void handleSystemMetricsEvent( - SystemMetricsEvent event) { - switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_UPDATED: - publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); - entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); - entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); - entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); - entityInfo.put( - ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); - entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); - entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, - event.getAppNodeLabelsExpression()); - entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, - event.getAmNodeLabelsExpression()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { - eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); - } - RMAppMetrics appMetrics = event.getAppMetrics(); - entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, - appMetrics.getVcoreSeconds()); - entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, - appMetrics.getMemorySeconds()); - - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); + @VisibleForTesting + boolean isPublishContainerMetrics() { + return publishContainerMetrics; } - private void publishApplicationACLsUpdatedEvent( - ApplicationACLsUpdatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); - entity.setOtherInfo(entityInfo); - tEvent.setEventType( - ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createApplicationEntity( - ApplicationId applicationId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); - entity.setEntityId(applicationId.toString()); - return entity; - } - - private void - publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); - eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - eventInfo.put( - AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationAttemptState().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - AppAttemptMetricsConstants.ENTITY_TYPE); - entity.setEntityId(appAttemptId.toString()); - entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, - appAttemptId.getApplicationId().toString()); - return entity; - } - - private void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, - event.getAllocatedResource().getMemory()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, - event.getAllocatedResource().getVirtualCores()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - entityInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, - event.getContainerState().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createContainerEntity( - ContainerId containerId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - ContainerMetricsConstants.ENTITY_TYPE); - entity.setEntityId(containerId.toString()); - entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, - containerId.getApplicationAttemptId().toString()); - return entity; - } - - private void putEntity(TimelineEntity entity) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - client.putEntities(entity); - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); - } + @VisibleForTesting + Dispatcher getDispatcher() { + return dispatcher; } - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - + interface TimelineServicePublisher extends Service { + /** + * @return the Dispatcher which needs to be used to dispatch events + */ + Dispatcher getDispatcher(); + + /** + * @return true if RMContainerMetricsNeeds to be sent + */ + boolean publishRMContainerMetrics(); + + /** + * @return EventHandler which needs to be registered to the dispatcher to + * handle the SystemMetricsEvent + */ + EventHandler getEventHandler(); } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected static class MultiThreadedDispatcher extends CompositeService - implements Dispatcher { - - private List dispatchers = - new ArrayList(); - - public MultiThreadedDispatcher(int num) { - super(MultiThreadedDispatcher.class.getName()); - for (int i = 0; i < num; ++i) { - AsyncDispatcher dispatcher = createDispatcher(); - dispatchers.add(dispatcher); - addIfService(dispatcher); - } - } - - @Override - public EventHandler getEventHandler() { - return new CompositEventHandler(); - } - - @Override - public void register(Class eventType, EventHandler handler) { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.register(eventType, handler); - } - } - - public void setDrainEventsOnStop() { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.setDrainEventsOnStop(); - } - } - - private class CompositEventHandler implements EventHandler { - - @Override - public void handle(Event event) { - // Use hashCode (of ApplicationId) to dispatch the event to the child - // dispatcher, such that all the writing events of one application will - // be handled by one thread, the scheduled order of the these events - // will be preserved - int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); - dispatchers.get(index).getEventHandler().handle(event); - } - - } - - protected AsyncDispatcher createDispatcher() { - return new AsyncDispatcher(); - } - - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java new file mode 100644 index 0000000..0dd1bca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +public class TimelineServiceV1Publisher extends + AbstractTimelineServicePublisher { + + private static final Log LOG = LogFactory + .getLog(TimelineServiceV1Publisher.class); + + public TimelineServiceV1Publisher() { + super("TimelineserviceV1Publisher"); + } + + private TimelineClient client; + + @Override + public void serviceInit(Configuration conf) throws Exception { + client = TimelineClient.createTimelineClient(); + addIfService(client); + super.serviceInit(conf); + } + + @Override + void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + event.getAppTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + event.isUnmanagedApp()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + event.getApplicationPriority().getPriority()); + entity.setOtherInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event + .getApplicationPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + + TimelineEvent tEvent = new TimelineEvent(); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setOtherInfo(entityInfo); + tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createApplicationEntity( + ApplicationId applicationId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(applicationId.toString()); + return entity; + } + + @Override + void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event + .getMasterContainerId().toString()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationAttemptState().toString()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + return entity; + } + + @Override + void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + event.getAllocatedResource().getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event + .getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event + .getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event + .getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + entity.setOtherInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event + .getContainerState().toString()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity.getEntityId() + + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + client.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java new file mode 100644 index 0000000..033e0c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -0,0 +1,318 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This class is responsible for posting application, appattempt & Container + * lifecycle related events to timeline service V2 + */ +@Private +@Unstable +public class TimelineServiceV2Publisher extends + AbstractTimelineServicePublisher { + private static final Log LOG = LogFactory + .getLog(TimelineServiceV2Publisher.class); + protected RMTimelineCollectorManager rmTimelineCollectorManager; + + public TimelineServiceV2Publisher(RMContext rmContext) { + super("TimelineserviceV2Publisher"); + rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); + } + + private boolean publishContainerMetrics; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + publishContainerMetrics = + conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); + super.serviceInit(conf); + } + + @Override + void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + entity.setQueue(event.getQueue()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + event.getAppTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + event.isUnmanagedApp()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + event.getApplicationPriority().getPriority()); + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + + putEntity(entity, event.getApplicationId()); + } + + @Override + void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationId()); + + //cleaning up the collector cached + event.getApp().stopTimelineCollector(); + } + + @Override + void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event + .getApplicationPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationId()); + } + + @Override + void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setInfo(entityInfo); + + putEntity(entity, event.getApplicationId()); + } + + private static ApplicationEntity createApplicationEntity( + ApplicationId applicationId) { + ApplicationEntity entity = new ApplicationEntity(); + entity.setId(applicationId.toString()); + return entity; + } + + @Override + void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event + .getMasterContainerId().toString()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + } + + @Override + void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + ApplicationAttemptEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationAttemptState().toString()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + } + + @Override + void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + // updated as event info instead of entity info, as entity info is updated + // by NM + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event + .getAllocatedResource().getMemory()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event + .getAllocatedResource().getVirtualCores()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event + .getAllocatedNode().getHost()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event + .getAllocatedNode().getPort()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + eventInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getContainerId().getApplicationAttemptId() + .getApplicationId()); + } + + @Override + void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event + .getContainerState().toString()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getContainerId().getApplicationAttemptId() + .getApplicationId()); + } + + private static ContainerEntity createContainerEntity(ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT + .name(), containerId.getApplicationAttemptId().toString())); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineCollector timelineCollector = + rmTimelineCollectorManager.get(appId); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + timelineCollector.putEntities(entities, + UserGroupInformation.getCurrentUser()); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; + } + + @Override + public boolean publishRMContainerMetrics() { + return publishContainerMetrics; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 6bd41e2..7c1d375 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1337,8 +1337,6 @@ public class RMAppImpl implements RMApp, Recoverable { .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); - - app.stopTimelineCollector(); }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index ec71846..d0bd68f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -89,7 +89,7 @@ public class TestRMAppLogAggregationStatus { rmContext = new RMContextImpl(rmDispatcher, null, null, null, null, null, null, null, null); - rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher()); + rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); rmContext http://git-wip-us.apache.org/repos/asf/hadoop/blob/d35d861c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 5965ecb..0418f37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -74,7 +75,7 @@ public class TestSystemMetricsPublisher { public static void setup() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, MemoryTimelineStore.class, TimelineStore.class); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, @@ -88,7 +89,7 @@ public class TestSystemMetricsPublisher { timelineServer.start(); store = timelineServer.getTimelineStore(); - metricsPublisher = new SystemMetricsPublisher(); + metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); metricsPublisher.init(conf); metricsPublisher.start(); }