hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject [33/50] [abbrv] hadoop git commit: YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.
Date Thu, 13 Aug 2015 21:20:24 GMT
YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.

(cherry picked from commit 17842a3f61ed33ec831a889c11a74d4814be73ec)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdb0a7b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdb0a7b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdb0a7b8

Branch: refs/heads/YARN-2928-new
Commit: bdb0a7b8d9c6a8d9d3f118afd721bc32de749be2
Parents: bcc1623
Author: Zhijie Shen <zjshen@apache.org>
Authored: Sat Jun 13 11:32:41 2015 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu Aug 13 13:53:32 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../records/timelineservice/TimelineEntity.java |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../distributedshell/TestDistributedShell.java  | 118 +++--
 .../metrics/ContainerMetricsConstants.java      |  10 +
 .../server/resourcemanager/ResourceManager.java |   2 +-
 .../AbstractTimelineServicePublisher.java       | 179 ++++++++
 .../metrics/ApplicationFinishedEvent.java       |  13 +-
 .../metrics/SystemMetricsPublisher.java         | 431 ++++---------------
 .../metrics/TimelineServiceV1Publisher.java     | 265 ++++++++++++
 .../metrics/TimelineServiceV2Publisher.java     | 296 +++++++++++++
 .../server/resourcemanager/rmapp/RMAppImpl.java |   2 -
 .../TestRMAppLogAggregationStatus.java          |   4 +-
 .../metrics/TestSystemMetricsPublisher.java     |   8 +-
 .../TestSystemMetricsPublisherForV2.java        | 374 ++++++++++++++++
 .../storage/FileSystemTimelineWriterImpl.java   |   4 +-
 16 files changed, 1321 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 57eadd5..808fa70 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3721. build is broken on YARN-2928 branch due to possible dependency
     cycle (Li Lu via sjlee)
 
+    YARN-3044. Made RM write app, attempt and optional container lifecycle
+    events to timeline service v2. (Naganarasimha G R via zjshen)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index defadec..a641f32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -470,4 +470,7 @@ public class TimelineEntity {
     return real == null ? this : real;
   }
 
+  public String toString() {
+    return identifier.toString();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5553ab4..3e9833e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -388,6 +388,16 @@ public class YarnConfiguration extends Configuration {
       + "system-metrics-publisher.enabled";
   public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
 
+  /**
+   * The setting that controls whether yarn container metrics is published to
+   * the timeline server or not by RM. This configuration setting is for ATS
+   * V2
+   */
+  public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX
+      + "rm.system-metrics-publisher.emit-container-events";
+  public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED =
+      false;
+
   public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
   public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index cc5f5e2..d31f2c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -52,13 +52,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -107,6 +108,7 @@ public class TestDistributedShell {
       conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
         + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
     }
+    conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     // Enable ContainersMonitorImpl
@@ -373,48 +375,96 @@ public class TestDistributedShell {
               "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
               appId.toString();
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-      String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
-
-      File entityFolder = new File(outputDirApp);
-      Assert.assertTrue(entityFolder.isDirectory());
 
+      // Verify DS_APP_ATTEMPT entities posted by the client
       // there will be at least one attempt, look for that file
-      String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
-          + "_000" + appId.getId() + "_000001"
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      String appAttemptFileName = outputDirApp + appTimestampFileName;
-      File appAttemptFile = new File(appAttemptFileName);
-      Assert.assertTrue(appAttemptFile.exists());
-
-      String outputDirContainer = basePath + "/DS_CONTAINER/";
-      File containerFolder = new File(outputDirContainer);
-      Assert.assertTrue(containerFolder.isDirectory());
-
-      String containerTimestampFileName = "container_"
-          + appId.getClusterTimestamp() + "_000" + appId.getId()
-          + "_01_000002.thist";
-      String containerFileName = outputDirContainer + containerTimestampFileName;
-      File containerFile = new File(containerFileName);
-      Assert.assertTrue(containerFile.exists());
+      String appTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
+          appTimestampFileName);
+
+      // Verify DS_CONTAINER entities posted by the client
+      String containerTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000002.thist";
+      verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
+          containerTimestampFileName);
 
       // Verify NM posting container metrics info.
-      String outputDirContainerMetrics = basePath + "/" + 
-          TimelineEntityType.YARN_CONTAINER + "/";
-      File containerMetricsFolder = new File(outputDirContainerMetrics);
-      Assert.assertTrue(containerMetricsFolder.isDirectory());
+      String containerMetricsTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      verifyEntityTypeFileExists(basePath,
+          TimelineEntityType.YARN_CONTAINER.toString(),
+          containerMetricsTimestampFileName);
+
+      // Verify RM posting Application life cycle Events are getting published
+      String appMetricsTimestampFileName =
+          "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION.toString(),
+              appMetricsTimestampFileName);
+      verifyStringExistsSpecifiedTimes(appEntityFile,
+          ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
+          "Application created event should be published atleast once");
+      verifyStringExistsSpecifiedTimes(appEntityFile,
+          ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
+          "Application finished event should be published atleast once");
+
+      // Verify RM posting AppAttempt life cycle Events are getting published
+      String appAttemptMetricsTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appAttemptEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+              appAttemptMetricsTimestampFileName);
+      verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
+          "AppAttempt register event should be published atleast once");
+      verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
+          "AppAttempt finished event should be published atleast once");
+    } finally {
+      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+    }
+  }
 
-      String containerMetricsTimestampFileName = "container_"
-        + appId.getClusterTimestamp() + "_000" + appId.getId()
-        + "_01_000001.thist";
-      String containerMetricsFileName = outputDirContainerMetrics + 
-        containerMetricsTimestampFileName;
+  private File verifyEntityTypeFileExists(String basePath, String entityType,
+      String entityfileName) {
+    String outputDirPathForEntity = basePath + "/" + entityType + "/";
+    File outputDirForEntity = new File(outputDirPathForEntity);
+    Assert.assertTrue(outputDirForEntity.isDirectory());
 
-      File containerMetricsFile = new File(containerMetricsFileName);
-      Assert.assertTrue(containerMetricsFile.exists());
+    String entityFilePath = outputDirPathForEntity + entityfileName;
 
+    File entityFile = new File(entityFilePath);
+    Assert.assertTrue(entityFile.exists());
+    return entityFile;
+  }
+
+  private void verifyStringExistsSpecifiedTimes(File entityFile,
+      String searchString, long expectedNumOfTimes, String errorMsg)
+      throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    long actualCount = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().contains(searchString))
+          actualCount++;
+      }
     } finally {
-      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+      reader.close();
     }
+    Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 0d5540d..7b42994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -27,10 +27,20 @@ public class ContainerMetricsConstants {
 
   public static final String ENTITY_TYPE = "YARN_CONTAINER";
 
+  // Event of this type will be emitted by NM.
   public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
 
+  // Event of this type will be emitted by RM.
+  public static final String CREATED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_CREATED";
+
+  // Event of this type will be emitted by NM.
   public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
 
+  // Event of this type will be emitted by RM.
+  public static final String FINISHED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_FINISHED";
+
   public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
 
   public static final String ALLOCATED_MEMORY_ENTITY_INFO =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 525c88b..70c0014 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -373,7 +373,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    return new SystemMetricsPublisher(); 
+    return new SystemMetricsPublisher(rmContext);
   }
 
   // sanity check for configurations

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
new file mode 100644
index 0000000..0d66327
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
+
+public abstract class AbstractTimelineServicePublisher extends CompositeService
+    implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
+
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV2Publisher.class);
+
+  private Configuration conf;
+
+  public AbstractTimelineServicePublisher(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(SystemMetricsEvent event) {
+    switch (event.getType()) {
+    case APP_CREATED:
+      publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
+      break;
+    case APP_FINISHED:
+      publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
+      break;
+    case APP_ACLS_UPDATED:
+      publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
+      break;
+    case APP_ATTEMPT_REGISTERED:
+      publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
+      break;
+    case APP_ATTEMPT_FINISHED:
+      publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
+      break;
+    case CONTAINER_CREATED:
+      publishContainerCreatedEvent((ContainerCreatedEvent) event);
+      break;
+    case CONTAINER_FINISHED:
+      publishContainerFinishedEvent((ContainerFinishedEvent) event);
+      break;
+    default:
+      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+    }
+  }
+
+  abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
+
+  abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
+
+  abstract void publishApplicationACLsUpdatedEvent(
+      ApplicationACLsUpdatedEvent event);
+
+  abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
+
+  abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
+
+  abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
+
+  abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
+
+  @Override
+  public Dispatcher getDispatcher() {
+    MultiThreadedDispatcher dispatcher =
+        new MultiThreadedDispatcher(
+            conf.getInt(
+                YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+                YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+    dispatcher.setDrainEventsOnStop();
+    return dispatcher;
+  }
+
+  @Override
+  public boolean publishRMContainerMetrics() {
+    return true;
+  }
+
+  @Override
+  public EventHandler<SystemMetricsEvent> getEventHandler() {
+    return this;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static class MultiThreadedDispatcher extends CompositeService
+      implements Dispatcher {
+
+    private List<AsyncDispatcher> dispatchers =
+        new ArrayList<AsyncDispatcher>();
+
+    public MultiThreadedDispatcher(int num) {
+      super(MultiThreadedDispatcher.class.getName());
+      for (int i = 0; i < num; ++i) {
+        AsyncDispatcher dispatcher = createDispatcher();
+        dispatchers.add(dispatcher);
+        addIfService(dispatcher);
+      }
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return new CompositEventHandler();
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType, EventHandler handler) {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.register(eventType, handler);
+      }
+    }
+
+    public void setDrainEventsOnStop() {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.setDrainEventsOnStop();
+      }
+    }
+
+    private class CompositEventHandler implements EventHandler<Event> {
+
+      @Override
+      public void handle(Event event) {
+        // Use hashCode (of ApplicationId) to dispatch the event to the child
+        // dispatcher, such that all the writing events of one application will
+        // be handled by one thread, the scheduled order of the these events
+        // will be preserved
+        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
+        dispatchers.get(index).getEventHandler().handle(event);
+      }
+    }
+
+    protected AsyncDispatcher createDispatcher() {
+      return new AsyncDispatcher();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
index 8d75f92..d9241b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 
 public class ApplicationFinishedEvent extends
@@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends
   private YarnApplicationState state;
   private ApplicationAttemptId latestAppAttemptId;
   private RMAppMetrics appMetrics;
+  private RMAppImpl app;
 
   public ApplicationFinishedEvent(
       ApplicationId appId,
@@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends
       YarnApplicationState state,
       ApplicationAttemptId latestAppAttemptId,
       long finishedTime,
-      RMAppMetrics appMetrics) {
+      RMAppMetrics appMetrics,
+      RMAppImpl app) {
     super(SystemMetricsEventType.APP_FINISHED, finishedTime);
     this.appId = appId;
     this.diagnosticsInfo = diagnosticsInfo;
     this.appStatus = appStatus;
     this.latestAppAttemptId = latestAppAttemptId;
     this.state = state;
-    this.appMetrics=appMetrics;
+    this.appMetrics = appMetrics;
+    this.app = app;
   }
 
   @Override
@@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends
     return appId.hashCode();
   }
 
+  public RMAppImpl getApp() {
+    return app;
+  }
+
   public ApplicationId getApplicationId() {
     return appId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index b4ce4f9..1b8cce3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,31 +25,23 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
- * The class that helps RM publish metrics to the timeline server V1. RM will
+ * The class that helps RM publish metrics to the timeline server. RM will
  * always invoke the methods of this class regardless the service is enabled or
  * not. If it is disabled, publishing requests will be ignored silently.
  */
@@ -65,30 +53,38 @@ public class SystemMetricsPublisher extends CompositeService {
       .getLog(SystemMetricsPublisher.class);
 
   private Dispatcher dispatcher;
-  private TimelineClient client;
-  private boolean publishSystemMetricsToATSv1;
+  private boolean publishSystemMetrics;
+  private boolean publishContainerMetrics;
+  protected RMContext rmContext;
 
-  public SystemMetricsPublisher() {
+  public SystemMetricsPublisher(RMContext rmContext) {
     super(SystemMetricsPublisher.class.getName());
+    this.rmContext = rmContext;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetricsToATSv1 =
+    publishSystemMetrics =
         conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-            && conf.getBoolean(
-                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
-    if (publishSystemMetricsToATSv1) {
-      client = TimelineClient.createTimelineClient();
-      addIfService(client);
-
-      dispatcher = createDispatcher(conf);
-      dispatcher.register(SystemMetricsEventType.class,
-          new ForwardingEventHandler());
-      addIfService(dispatcher);
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+    if (publishSystemMetrics) {
+      TimelineServicePublisher timelineServicePublisher =
+          getTimelineServicePublisher(conf);
+      if (timelineServicePublisher != null) {
+        addService(timelineServicePublisher);
+        // init required to be called so that other methods of
+        // TimelineServicePublisher can be utilized
+        timelineServicePublisher.init(conf);
+        dispatcher = createDispatcher(timelineServicePublisher);
+        publishContainerMetrics =
+            timelineServicePublisher.publishRMContainerMetrics();
+        dispatcher.register(SystemMetricsEventType.class,
+            timelineServicePublisher.getEventHandler());
+        addIfService(dispatcher);
+      } else {
+        LOG.info("TimelineServicePublisher is not configured");
+        publishSystemMetrics = false;
+      }
       LOG.info("YARN system metrics publishing service is enabled");
     } else {
       LOG.info("YARN system metrics publishing service is not enabled");
@@ -96,9 +92,26 @@ public class SystemMetricsPublisher extends CompositeService {
     super.serviceInit(conf);
   }
 
+  @VisibleForTesting
+  Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
+    return timelineServicePublisher.getDispatcher();
+  }
+
+  TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
+    if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+      return new TimelineServiceV1Publisher();
+    } else if (conf.getBoolean(
+        YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+      return new TimelineServiceV2Publisher(rmContext);
+    }
+    return null;
+  }
+
   @SuppressWarnings("unchecked")
   public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationCreatedEvent(
               app.getApplicationId(),
@@ -115,7 +128,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationFinishedEvent(
               app.getApplicationId(),
@@ -125,14 +138,15 @@ public class SystemMetricsPublisher extends CompositeService {
               app.getCurrentAppAttempt() == null ?
                   null : app.getCurrentAppAttempt().getAppAttemptId(),
               finishedTime,
-              app.getRMAppMetrics()));
+              app.getRMAppMetrics(),
+              (RMAppImpl)app));
     }
   }
 
   @SuppressWarnings("unchecked")
   public void appACLsUpdated(RMApp app, String appViewACLs,
       long updatedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationACLsUpdatedEvent(
               app.getApplicationId(),
@@ -144,7 +158,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptRegistered(RMAppAttempt appAttempt,
       long registeredTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new AppAttemptRegisteredEvent(
               appAttempt.getAppAttemptId(),
@@ -160,7 +174,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptFinished(RMAppAttempt appAttempt,
       RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new AppAttemptFinishedEvent(
               appAttempt.getAppAttemptId(),
@@ -177,7 +191,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerCreated(RMContainer container, long createdTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishContainerMetrics) {
       dispatcher.getEventHandler().handle(
           new ContainerCreatedEvent(
               container.getContainerId(),
@@ -190,7 +204,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishContainerMetrics) {
       dispatcher.getEventHandler().handle(
           new ContainerFinishedEvent(
               container.getContainerId(),
@@ -201,322 +215,31 @@ public class SystemMetricsPublisher extends CompositeService {
     }
   }
 
-  protected Dispatcher createDispatcher(Configuration conf) {
-    MultiThreadedDispatcher dispatcher =
-        new MultiThreadedDispatcher(
-            conf.getInt(
-                YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
-                YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
-    dispatcher.setDrainEventsOnStop();
-    return dispatcher;
-  }
-
-  protected void handleSystemMetricsEvent(
-      SystemMetricsEvent event) {
-    switch (event.getType()) {
-      case APP_CREATED:
-        publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
-        break;
-      case APP_FINISHED:
-        publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
-        break;
-      case APP_ACLS_UPDATED:
-        publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
-        break;
-      case APP_ATTEMPT_REGISTERED:
-        publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
-        break;
-      case APP_ATTEMPT_FINISHED:
-        publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
-        break;
-      case CONTAINER_CREATED:
-        publishContainerCreatedEvent((ContainerCreatedEvent) event);
-        break;
-      case CONTAINER_FINISHED:
-        publishContainerFinishedEvent((ContainerFinishedEvent) event);
-        break;
-      default:
-        LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-
-  private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
-    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
-    entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
-    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
-    entityInfo.put(
-        ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-        event.isUnmanagedApp());
-    entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-        event.getApplicationPriority().getPriority());
-    entity.setOtherInfo(entityInfo);
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
+  @VisibleForTesting
+  boolean isPublishContainerMetrics() {
+    return publishContainerMetrics;
   }
 
-  private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
-        event.getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
-        event.getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
-      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
-    }
-    RMAppMetrics appMetrics = event.getAppMetrics();
-    entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
-        appMetrics.getVcoreSeconds());
-    entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
-        appMetrics.getMemorySeconds());
-    
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishApplicationACLsUpdatedEvent(
-      ApplicationACLsUpdatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    TimelineEvent tEvent = new TimelineEvent();
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
-    entity.setOtherInfo(entityInfo);
-    tEvent.setEventType(
-        ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createApplicationEntity(
-      ApplicationId applicationId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(applicationId.toString());
-    return entity;
-  }
-
-  private void
-      publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
-        event.getHost());
-    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(
-        AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-        event.getMasterContainerId().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
-        event.getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
-        event.getYarnApplicationAttemptState().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(
-        AppAttemptMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(appAttemptId.toString());
-    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
-        appAttemptId.getApplicationId().toString());
-    return entity;
-  }
-
-  private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
-        event.getAllocatedResource().getMemory());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
-        event.getAllocatedResource().getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
-        event.getAllocatedNode().getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
-        event.getAllocatedNode().getPort());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    entityInfo.put(
-      ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-      event.getNodeHttpAddress());
-    entity.setOtherInfo(entityInfo);
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
-        event.getContainerState().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createContainerEntity(
-      ContainerId containerId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(
-        ContainerMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(containerId.toString());
-    entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
-        containerId.getApplicationAttemptId().toString());
-    return entity;
-  }
-
-  private void putEntity(TimelineEntity entity) {
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Publishing the entity " + entity.getEntityId() +
-            ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      }
-      client.putEntities(entity);
-    } catch (Exception e) {
-      LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
-          + entity.getEntityId() + "]", e);
-    }
+  @VisibleForTesting
+  Dispatcher getDispatcher() {
+    return dispatcher;
   }
 
-  /**
-   * EventHandler implementation which forward events to SystemMetricsPublisher.
-   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
-   * method.
-   */
-  private final class ForwardingEventHandler implements
-      EventHandler<SystemMetricsEvent> {
+  interface TimelineServicePublisher extends Service {
+    /**
+     * @return the Dispatcher which needs to be used to dispatch events
+     */
+    Dispatcher getDispatcher();
 
-    @Override
-    public void handle(SystemMetricsEvent event) {
-      handleSystemMetricsEvent(event);
-    }
+    /**
+     * @return true if RMContainerMetricsNeeds to be sent
+     */
+    boolean publishRMContainerMetrics();
 
+    /**
+     * @return EventHandler which needs to be registered to the dispatcher to
+     *         handle the SystemMetricsEvent
+     */
+    EventHandler<SystemMetricsEvent> getEventHandler();
   }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected static class MultiThreadedDispatcher extends CompositeService
-      implements Dispatcher {
-
-    private List<AsyncDispatcher> dispatchers =
-        new ArrayList<AsyncDispatcher>();
-
-    public MultiThreadedDispatcher(int num) {
-      super(MultiThreadedDispatcher.class.getName());
-      for (int i = 0; i < num; ++i) {
-        AsyncDispatcher dispatcher = createDispatcher();
-        dispatchers.add(dispatcher);
-        addIfService(dispatcher);
-      }
-    }
-
-    @Override
-    public EventHandler getEventHandler() {
-      return new CompositEventHandler();
-    }
-
-    @Override
-    public void register(Class<? extends Enum> eventType, EventHandler handler) {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.register(eventType, handler);
-      }
-    }
-
-    public void setDrainEventsOnStop() {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.setDrainEventsOnStop();
-      }
-    }
-
-    private class CompositEventHandler implements EventHandler<Event> {
-
-      @Override
-      public void handle(Event event) {
-        // Use hashCode (of ApplicationId) to dispatch the event to the child
-        // dispatcher, such that all the writing events of one application will
-        // be handled by one thread, the scheduled order of the these events
-        // will be preserved
-        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
-        dispatchers.get(index).getEventHandler().handle(event);
-      }
-
-    }
-
-    protected AsyncDispatcher createDispatcher() {
-      return new AsyncDispatcher();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
new file mode 100644
index 0000000..c4a85b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+public class TimelineServiceV1Publisher extends
+    AbstractTimelineServicePublisher {
+
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV1Publisher.class);
+
+  public TimelineServiceV1Publisher() {
+    super("TimelineserviceV1Publisher");
+  }
+
+  private TimelineClient client;
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    client = TimelineClient.createTimelineClient();
+    addIfService(client);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+        event.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+        event.getUser());
+    entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
+        event.getQueue());
+    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+        event.getSubmittedTime());
+    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+        event.getAppTags());
+    entity.setOtherInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationState().toString());
+    if (event.getLatestApplicationAttemptId() != null) {
+      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+          event.getLatestApplicationAttemptId().toString());
+    }
+    RMAppMetrics appMetrics = event.getAppMetrics();
+    entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+        appMetrics.getVcoreSeconds());
+    entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+        appMetrics.getMemorySeconds());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        event.getViewAppACLs());
+    entity.setOtherInfo(entityInfo);
+    tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(applicationId.toString());
+    return entity;
+  }
+
+  @Override
+  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+        event.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+        .getMasterContainerId().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationAttemptState().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(appAttemptId.toString());
+    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
+        appAttemptId.getApplicationId().toString());
+    return entity;
+  }
+
+  @Override
+  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+        event.getAllocatedResource().getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+        .getAllocatedResource().getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+        .getAllocatedNode().getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+        .getAllocatedNode().getPort());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        event.getAllocatedPriority().getPriority());
+    entityInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        event.getNodeHttpAddress());
+    entity.setOtherInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        event.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+        .getContainerState().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createContainerEntity(ContainerId containerId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(containerId.toString());
+    entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
+        containerId.getApplicationAttemptId().toString());
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity.getEntityId()
+            + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      client.putEntities(entity);
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+          + entity.getEntityId() + "]", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
new file mode 100644
index 0000000..81ce54c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This class is responsible for posting application, appattempt & Container
+ * lifecycle related events to timeline service V2
+ */
+@Private
+@Unstable
+public class TimelineServiceV2Publisher extends
+    AbstractTimelineServicePublisher {
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV2Publisher.class);
+  protected RMTimelineCollectorManager rmTimelineCollectorManager;
+
+  public TimelineServiceV2Publisher(RMContext rmContext) {
+    super("TimelineserviceV2Publisher");
+    rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
+  }
+
+  private boolean publishContainerMetrics;
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    publishContainerMetrics =
+        conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+            YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    entity.setQueue(event.getQueue());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+        event.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+        event.getUser());
+    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+        event.getSubmittedTime());
+    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+        event.getAppTags());
+    entity.setInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    entity.addEvent(tEvent);
+
+    putEntity(entity, event.getApplicationId());
+  }
+
+  @Override
+  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    RMAppMetrics appMetrics = event.getAppMetrics();
+    entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+        appMetrics.getVcoreSeconds());
+    entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+        appMetrics.getMemorySeconds());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationState().toString());
+    if (event.getLatestApplicationAttemptId() != null) {
+      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+          event.getLatestApplicationAttemptId().toString());
+    }
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationId());
+
+    //cleaning up the collector cached
+    event.getApp().stopTimelineCollector();
+  }
+
+  @Override
+  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        event.getViewAppACLs());
+    entity.setInfo(entityInfo);
+
+    putEntity(entity, event.getApplicationId());
+  }
+
+  private static ApplicationEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    ApplicationEntity entity = new ApplicationEntity();
+    entity.setId(applicationId.toString());
+    return entity;
+  }
+
+  @Override
+  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+        event.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+        .getMasterContainerId().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+  }
+
+  @Override
+  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+    ApplicationAttemptEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationAttemptState().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+  }
+
+  @Override
+  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    // updated as event info instead of entity info, as entity info is updated
+    // by NM
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
+        .getAllocatedResource().getMemory());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+        .getAllocatedResource().getVirtualCores());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+        .getAllocatedNode().getHost());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+        .getAllocatedNode().getPort());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        event.getAllocatedPriority().getPriority());
+    eventInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        event.getNodeHttpAddress());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getContainerId().getApplicationAttemptId()
+        .getApplicationId());
+  }
+
+  @Override
+  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        event.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+        .getContainerState().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getContainerId().getApplicationAttemptId()
+        .getApplicationId());
+  }
+
+  private static ContainerEntity createContainerEntity(ContainerId containerId) {
+    ContainerEntity entity = new ContainerEntity();
+    entity.setId(containerId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT
+        .name(), containerId.getApplicationAttemptId().toString()));
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity, ApplicationId appId) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      TimelineCollector timelineCollector =
+          rmTimelineCollectorManager.get(appId);
+      TimelineEntities entities = new TimelineEntities();
+      entities.addEntity(entity);
+      timelineCollector.putEntities(entities,
+          UserGroupInformation.getCurrentUser());
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity " + entity, e);
+    }
+  }
+
+  private static ApplicationAttemptEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+    entity.setId(appAttemptId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+        appAttemptId.getApplicationId().toString()));
+    return entity;
+  }
+
+  @Override
+  public boolean publishRMContainerMetrics() {
+    return publishContainerMetrics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index fa8aad9..bedb21a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1300,8 +1300,6 @@ public class RMAppImpl implements RMApp, Recoverable {
           .applicationFinished(app, finalState);
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
-
-      app.stopTimelineCollector();
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index ae14253..a5bb4df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -89,8 +88,7 @@ public class TestRMAppLogAggregationStatus {
     rmContext =
         new RMContextImpl(rmDispatcher, null, null, null,
           null, null, null, null, null);
-    rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
-    rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
+    rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
 
     rmContext
         .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdb0a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index e0d1b6f..adb2259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -72,7 +74,7 @@ public class TestSystemMetricsPublisher {
   public static void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
         MemoryTimelineStore.class, TimelineStore.class);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
@@ -86,7 +88,7 @@ public class TestSystemMetricsPublisher {
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher();
+    metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }
@@ -351,7 +353,7 @@ public class TestSystemMetricsPublisher {
   }
 
   private static RMApp createRMApp(ApplicationId appId) {
-    RMApp app = mock(RMApp.class);
+    RMApp app = mock(RMAppImpl.class);
     when(app.getApplicationId()).thenReturn(appId);
     when(app.getName()).thenReturn("test app");
     when(app.getApplicationType()).thenReturn("test app type");


Mime
View raw message