hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject [29/50] [abbrv] hadoop git commit: YARN-4129. Refactor the SystemMetricPublisher in RM to better support newer events (Naganarasimha G R via sjlee)
Date Wed, 20 Jan 2016 09:13:46 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c54f762/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index 4b93165..d858a6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -30,17 +30,23 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
-public class TimelineServiceV1Publisher extends
-    AbstractTimelineServicePublisher {
+public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
 
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV1Publisher.class);
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV1Publisher.class);
 
   public TimelineServiceV1Publisher() {
     super("TimelineserviceV1Publisher");
@@ -49,76 +55,69 @@ public class TimelineServiceV1Publisher extends
   private TimelineClient client;
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
+  protected void serviceInit(Configuration conf) throws Exception {
     client = TimelineClient.createTimelineClient();
     addIfService(client);
     super.serviceInit(conf);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV1EventHandler());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
+  public void appCreated(RMApp app, long createdTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
     entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
+        app.getQueue());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
         ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-        event.isUnmanagedApp());
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-        event.getApplicationPriority().getPriority());
-    entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
-        event.getAppNodeLabelsExpression());
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
-        event.getAmNodeLabelsExpression());
-    if (event.getCallerContext() != null) {
-      if (event.getCallerContext().getContext() != null) {
-        entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
-            event.getCallerContext().getContext());
-      }
-      if (event.getCallerContext().getSignature() != null) {
-        entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
-            event.getCallerContext().getSignature());
-      }
-    }
+        app.getAmNodeLabelExpression());
+    entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setOtherInfo(entityInfo);
     TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(createdTime);
+
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    String latestApplicationAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId().toString();
+    if (latestApplicationAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          latestApplicationAttemptId);
     }
-    RMAppMetrics appMetrics = event.getAppMetrics();
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -126,161 +125,174 @@ public class TimelineServiceV1Publisher extends
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    // sync sending of finish event to avoid possibility of saving application
+    // finished state in RMStateStore save without publishing in ATS
+    putEntity(entity);// sync event so that ATS update is done without fail
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appUpdated(RMApp app, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
     tEvent.setEventInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
+        (appViewACLs == null) ? "" : appViewACLs);
     entity.setOtherInfo(entityInfo);
     tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createApplicationEntity(
-      ApplicationId applicationId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(applicationId.toString());
-    return entity;
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+
     TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(registeredTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        appAttempt.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
-        event.getHost());
+        appAttempt.getHost());
     eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    if (event.getMasterContainerId() != null) {
-      eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-          event.getMasterContainerId().toString());
-    }
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
     tEvent.setEventInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
+
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
-    if (event.getMasterContainerId() != null) {
-      eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-          event.getMasterContainerId().toString());
-    }
+        appAttempt.getDiagnostics());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(appAttemptId.toString());
-    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
-        appAttemptId.getApplicationId().toString());
-    return entity;
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
+  public void containerCreated(RMContainer container, long createdTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
-        event.getAllocatedResource().getMemory());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
+        container.getAllocatedResource().getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+        container.getAllocatedResource().getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+        container.getAllocatedNode().getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+        container.getAllocatedNode().getPort());
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
+        container.getAllocatedPriority().getPriority());
     entityInfo.put(
         ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
+        container.getNodeHttpAddress());
     entity.setOtherInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
+  public void containerFinished(RMContainer container, long finishedTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
+        container.getDiagnosticsInfo());
     eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
+        container.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+        container.getContainerState().toString());
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
+  }
+
+  private static TimelineEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(applicationId.toString());
+    return entity;
+  }
+
+  private static TimelineEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(appAttemptId.toString());
+    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
+        appAttemptId.getApplicationId().toString());
+    return entity;
   }
 
   private static TimelineEntity createContainerEntity(ContainerId containerId) {
@@ -305,4 +317,26 @@ public class TimelineServiceV1Publisher extends
           + entity.getEntityId() + "]", e);
     }
   }
+
+  private class TimelineV1PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV1PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
+  }
+
+  private class TimelineV1EventHandler
+      implements EventHandler<TimelineV1PublishEvent> {
+    @Override
+    public void handle(TimelineV1PublishEvent event) {
+      putEntity(event.getEntity());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c54f762/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index 3a6c678..0105495 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,93 +38,100 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identif
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for posting application, appattempt & Container
  * lifecycle related events to timeline service V2
  */
 @Private
 @Unstable
-public class TimelineServiceV2Publisher extends
-    AbstractTimelineServicePublisher {
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV2Publisher.class);
+public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV2Publisher.class);
   protected RMTimelineCollectorManager rmTimelineCollectorManager;
+  private boolean publishContainerMetrics;
 
   public TimelineServiceV2Publisher(RMContext rmContext) {
     super("TimelineserviceV2Publisher");
     rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
   }
 
-  private boolean publishContainerMetrics;
-
   @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishContainerMetrics =
-        conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
-            YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
-    super.serviceInit(conf);
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    publishContainerMetrics = getConfig().getBoolean(
+        YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+        YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV2EventHandler());
+  }
+
+  @VisibleForTesting
+  boolean isPublishContainerMetrics() {
+    return publishContainerMetrics;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
+  public void appCreated(RMApp app, long createdTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    entity.setQueue(app.getQueue());
+    entity.setCreatedTime(createdTime);
+
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
-    entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
         ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-        event.isUnmanagedApp());
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-        event.getApplicationPriority().getPriority());
-    entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
-        event.getAppNodeLabelsExpression());
-    entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
-        event.getAmNodeLabelsExpression());
-    if (event.getCallerContext() != null) {
-      if (event.getCallerContext().getContext() != null) {
-        entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
-            event.getCallerContext().getContext());
-      }
-      if (event.getCallerContext().getSignature() != null) {
-        entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
-            event.getCallerContext().getSignature());
-      }
-    }
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
+        app.getAmNodeLabelExpression());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
     entity.addEvent(tEvent);
 
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    RMAppMetrics appMetrics = event.getAppMetrics();
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -133,54 +139,57 @@ public class TimelineServiceV2Publisher extends
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId();
+    if (appAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          appAttemptId.toString());
     }
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
 
-    //cleaning up the collector cached
-    event.getApp().stopTimelineCollector();
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        appViewACLs);
+    entity.setInfo(entityInfo);
+
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
+  public void appUpdated(RMApp app, long currentTimeMillis) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(currentTimeMillis);
     tEvent.setInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
-  }
-
-  @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
-    entity.setInfo(entityInfo);
-
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
   private static ApplicationEntity createApplicationEntity(
@@ -190,111 +199,134 @@ public class TimelineServiceV2Publisher extends
     return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+    entity.setCreatedTime(registeredTime);
+
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(registeredTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        appAttempt.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
-        event.getHost());
+        appAttempt.getHost());
     eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    if (event.getMasterContainerId() != null) {
-      eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-          event.getMasterContainerId().toString());
-    }
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
     tEvent.setInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+
     ApplicationAttemptEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
-    if (event.getMasterContainerId() != null) {
-      eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-          event.getMasterContainerId().toString());
-    }
+        appAttempt.getDiagnostics());
+    // app will get the final status from app attempt, or create one
+    // based on app state if it doesn't exist
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
-  @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    // updated as event info instead of entity info, as entity info is updated
-    // by NM
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
-        .getAllocatedResource().getMemory());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    eventInfo.put(
-        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
-    tEvent.setInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+  private static ApplicationAttemptEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+    entity.setId(appAttemptId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+        appAttemptId.getApplicationId().toString()));
+    return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
-    tEvent.setInfo(eventInfo);
+  public void containerCreated(RMContainer container, long createdTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = createContainerEntity(container.getContainerId());
+      entity.setCreatedTime(createdTime);
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(createdTime);
+      // updated as event info instead of entity info, as entity info is updated
+      // by NM
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+          container.getAllocatedResource().getMemory());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+          container.getAllocatedResource().getVirtualCores());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+          container.getAllocatedNode().getHost());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+          container.getAllocatedNode().getPort());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+          container.getAllocatedPriority().getPriority());
+      eventInfo.put(
+          ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+          container.getNodeHttpAddress());
+      tEvent.setInfo(eventInfo);
+
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
+  }
 
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = createContainerEntity(container.getContainerId());
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(finishedTime);
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+          container.getDiagnosticsInfo());
+      eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+          container.getContainerExitStatus());
+      eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+          container.getContainerState().toString());
+      tEvent.setInfo(eventInfo);
+
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
   }
 
   private static ContainerEntity createContainerEntity(ContainerId containerId) {
@@ -322,17 +354,48 @@ public class TimelineServiceV2Publisher extends
     }
   }
 
-  private static ApplicationAttemptEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
-    entity.setId(appAttemptId.toString());
-    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
-        appAttemptId.getApplicationId().toString()));
-    return entity;
+  private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent {
+    private RMAppImpl app;
+
+    public ApplicationFinishPublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, RMAppImpl app) {
+      super(type, entity, app.getApplicationId());
+      this.app = app;
+    }
+
+    public RMAppImpl getRMAppImpl() {
+      return app;
+    }
   }
 
-  @Override
-  public boolean publishRMContainerMetrics() {
-    return publishContainerMetrics;
+  private class TimelineV2EventHandler
+      implements EventHandler<TimelineV2PublishEvent> {
+    @Override
+    public void handle(TimelineV2PublishEvent event) {
+      switch (event.getType()) {
+      case PUBLISH_APPLICATION_FINISHED_ENTITY:
+        putEntity(event.getEntity(), event.getApplicationId());
+        ((ApplicationFinishPublishEvent) event).getRMAppImpl()
+            .stopTimelineCollector();
+        break;
+      default:
+        putEntity(event.getEntity(), event.getApplicationId());
+        break;
+      }
+    }
+  }
+
+  private class TimelineV2PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV2PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c54f762/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index ac5db49..2652acf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -69,7 +68,7 @@ import org.junit.Test;
 public class TestSystemMetricsPublisher {
 
   private static ApplicationHistoryServer timelineServer;
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV1Publisher metricsPublisher;
   private static TimelineStore store;
 
   @BeforeClass
@@ -90,7 +89,7 @@ public class TestSystemMetricsPublisher {
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+    metricsPublisher = new TimelineServiceV1Publisher();
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c54f762/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index ac20335..20a5b13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -75,7 +74,7 @@ public class TestSystemMetricsPublisherForV2 {
       TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
       .getAbsoluteFile();
 
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV2Publisher metricsPublisher;
   private static DrainDispatcher dispatcher = new DrainDispatcher();
   private static final String DEFAULT_FLOW_VERSION = "1";
   private static final long DEFAULT_FLOW_RUN = 1;
@@ -103,10 +102,11 @@ public class TestSystemMetricsPublisherForV2 {
     rmTimelineCollectorManager.init(conf);
     rmTimelineCollectorManager.start();
 
-    metricsPublisher = new SystemMetricsPublisher(rmContext) {
+    dispatcher.init(conf);
+    dispatcher.start();
+    metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
       @Override
-      Dispatcher createDispatcher(
-          TimelineServicePublisher timelineServicePublisher) {
+      protected Dispatcher getDispatcher() {
         return dispatcher;
       }
     };
@@ -150,8 +150,8 @@ public class TestSystemMetricsPublisherForV2 {
   @Test
   public void testSystemMetricPublisherInitialization() {
     @SuppressWarnings("resource")
-    SystemMetricsPublisher metricsPublisher =
-        new SystemMetricsPublisher(mock(RMContext.class));
+    TimelineServiceV2Publisher metricsPublisher =
+        new TimelineServiceV2Publisher(mock(RMContext.class));
     try {
       Configuration conf = getTimelineV2Conf();
       conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
@@ -163,20 +163,18 @@ public class TestSystemMetricsPublisherForV2 {
 
       metricsPublisher.stop();
 
-      metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+      metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class));
       conf = getTimelineV2Conf();
       metricsPublisher.init(conf);
+      metricsPublisher.start();
       assertTrue("Expected to publish container Metrics from RM",
           metricsPublisher.isPublishContainerMetrics());
-      assertTrue(
-          "MultiThreadedDispatcher expected when container Metrics is not published",
-          metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
     } finally {
       metricsPublisher.stop();
     }
   }
 
-  @Test(timeout = 1000000)
+  @Test(timeout = 10000)
   public void testPublishApplicationMetrics() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     RMApp app = createAppAndRegister(appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c54f762/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7637410..a642a78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;


Mime
View raw message