hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [17/50] [abbrv] hadoop git commit: MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.
Date Tue, 25 Aug 2015 18:08:46 GMT
MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.

(cherry picked from commit 5eeb2b156f8e108205945f0a1d06873cb51c3527)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e58f943
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e58f943
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e58f943

Branch: refs/heads/YARN-2928
Commit: 8e58f943ffbb9b6c02422984ee2c77ae45948c9e
Parents: 8fd1aa1
Author: Zhijie Shen <zjshen@apache.org>
Authored: Tue Apr 21 16:31:33 2015 -0700
Committer: Sangjin Lee <sjlee@apache.org>
Committed: Tue Aug 25 10:47:10 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  15 ++
 .../jobhistory/JobHistoryEventHandler.java      | 258 ++++++++++++++++---
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  23 ++
 .../v2/app/rm/RMContainerAllocator.java         |   9 +
 .../hadoop/mapreduce/jobhistory/TestEvents.java |   8 +-
 .../jobhistory/TestJobHistoryEventHandler.java  |   9 +-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   5 +
 .../mapreduce/jobhistory/AMStartedEvent.java    |  18 ++
 .../mapreduce/jobhistory/HistoryEvent.java      |   4 +
 .../mapreduce/jobhistory/JobFinishedEvent.java  |  25 ++
 .../jobhistory/JobInfoChangeEvent.java          |  11 +
 .../mapreduce/jobhistory/JobInitedEvent.java    |  14 +
 .../jobhistory/JobPriorityChangeEvent.java      |  10 +
 .../jobhistory/JobQueueChangeEvent.java         |  10 +
 .../jobhistory/JobStatusChangedEvent.java       |  10 +
 .../mapreduce/jobhistory/JobSubmittedEvent.java |  23 ++
 .../JobUnsuccessfulCompletionEvent.java         |  16 ++
 .../jobhistory/MapAttemptFinishedEvent.java     |  24 +-
 .../jobhistory/NormalizedResourceEvent.java     |  11 +
 .../jobhistory/ReduceAttemptFinishedEvent.java  |  25 +-
 .../jobhistory/TaskAttemptFinishedEvent.java    |  19 ++
 .../jobhistory/TaskAttemptStartedEvent.java     |  18 ++
 .../TaskAttemptUnsuccessfulCompletionEvent.java |  24 ++
 .../mapreduce/jobhistory/TaskFailedEvent.java   |  19 ++
 .../mapreduce/jobhistory/TaskFinishedEvent.java |  18 ++
 .../mapreduce/jobhistory/TaskStartedEvent.java  |  12 +
 .../mapreduce/jobhistory/TaskUpdatedEvent.java  |  10 +
 .../mapreduce/util/JobHistoryEventUtils.java    |  51 ++++
 .../src/main/resources/mapred-default.xml       |   7 +
 .../mapred/TestMRTimelineEventHandling.java     | 163 +++++++++++-
 .../hadoop/mapreduce/v2/MiniMRYarnCluster.java  |  21 +-
 31 files changed, 838 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 305b29e..5ac0d3b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -1,5 +1,20 @@
 Hadoop MapReduce Change Log
 
+Branch YARN-2928: Timeline Server Next Generation: Phase 1
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+    MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history
+    events and counters. (Junping Du via zjshen)
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index b0bcfcd..1e84e44 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.mapreduce.jobhistory;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -49,11 +52,13 @@ import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -68,10 +73,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.ObjectNode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 /**
@@ -119,14 +122,24 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+  
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO YARN-3367 replace with event loop in TimelineClient.
+  private static ExecutorService threadPool =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+          .build());
 
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
   protected TimelineClient timelineClient;
+  
+  private boolean newTimelineServiceEnabled = false;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
+  private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT";
 
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
@@ -246,13 +259,22 @@ public class JobHistoryEventHandler extends AbstractService
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
 
+    // TODO replace MR specific configurations on timeline service with getting 
+    // configuration from RM through registerApplicationMaster() in 
+    // ApplicationMasterProtocol with return value for timeline service 
+    // configuration status: off, on_with_v1 or on_with_v2.
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
       if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
             YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-        timelineClient = TimelineClient.createTimelineClient();
+        
+        timelineClient = 
+            ((MRAppMaster.RunningAppContext)context).getTimelineClient();
         timelineClient.init(conf);
-        LOG.info("Timeline service is enabled");
+        newTimelineServiceEnabled = conf.getBoolean(
+            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+        LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
         LOG.info("Emitting job history data to the timeline server is enabled");
       } else {
         LOG.info("Timeline service is not enabled");
@@ -426,9 +448,26 @@ public class JobHistoryEventHandler extends AbstractService
     if (timelineClient != null) {
       timelineClient.stop();
     }
+    shutdownAndAwaitTermination();
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
+  
+  // TODO remove threadPool after adding non-blocking call in TimelineClient
+  private static void shutdownAndAwaitTermination() {
+    threadPool.shutdown();
+    try {
+      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        threadPool.shutdownNow(); 
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
+            LOG.error("ThreadPool did not terminate");
+      }
+    } catch (InterruptedException ie) {
+      threadPool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
@@ -583,8 +622,13 @@ public class JobHistoryEventHandler extends AbstractService
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
         if (timelineClient != null) {
-          processEventForTimelineServer(historyEvent, event.getJobID(),
-              event.getTimestamp());
+          if (newTimelineServiceEnabled) {
+            processEventForNewTimelineService(historyEvent, event.getJobID(),
+                event.getTimestamp());
+          } else {
+            processEventForTimelineServer(historyEvent, event.getJobID(),
+                event.getTimestamp());
+          }
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
@@ -825,11 +869,11 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
         tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
         tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
-                countersToJSON(jfe.getTotalCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
         tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
-                countersToJSON(jfe.getReduceCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
         tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
-                countersToJSON(jfe.getTotalCounters()));
+            JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
         tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(jobId.toString());
@@ -855,7 +899,7 @@ public class JobHistoryEventHandler extends AbstractService
                 tfe.getFailedAttemptID() == null ?
                 "" : tfe.getFailedAttemptID().toString());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tfe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tfe.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -873,7 +917,7 @@ public class JobHistoryEventHandler extends AbstractService
         TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
         tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tfe2.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
         tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
         tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
         tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
@@ -895,7 +939,6 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("START_TIME", tase.getStartTime());
         tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
         tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
-        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
         tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
         tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
             "" : tase.getContainerId().toString());
@@ -928,7 +971,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
         tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tauce.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tauce.getTaskId().toString());
         tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -942,7 +985,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("STATE", mafe.getState());
         tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(mafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
         tEvent.addEventInfo("PORT", mafe.getPort());
         tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
@@ -964,7 +1007,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
         tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(rafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
         tEvent.addEventInfo("PORT", rafe.getPort());
         tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
@@ -983,7 +1026,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
         tEvent.addEventInfo("STATE", tafe.getState());
         tEvent.addEventInfo("COUNTERS_GROUPS",
-                countersToJSON(tafe.getCounters()));
+            JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
         tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(tafe.getTaskId().toString());
@@ -1010,37 +1053,172 @@ public class JobHistoryEventHandler extends AbstractService
       default:
         break;
     }
-
+    
     try {
       timelineClient.putEntities(tEntity);
-    } catch (IOException ex) {
-      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
-      + "Server", ex);
-    } catch (YarnException ex) {
+    } catch (IOException|YarnException ex) {
       LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
       + "Server", ex);
     }
   }
-
-  @Private
-  public JsonNode countersToJSON(Counters counters) {
-    ObjectMapper mapper = new ObjectMapper();
-    ArrayNode nodes = mapper.createArrayNode();
-    if (counters != null) {
-      for (CounterGroup counterGroup : counters) {
-        ObjectNode groupNode = nodes.addObject();
-        groupNode.put("NAME", counterGroup.getName());
-        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
-        ArrayNode countersNode = groupNode.putArray("COUNTERS");
-        for (Counter counter : counterGroup) {
-          ObjectNode counterNode = countersNode.addObject();
-          counterNode.put("NAME", counter.getName());
-          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
-          counterNode.put("VALUE", counter.getValue());
+  
+  private void putEntityWithoutBlocking(final TimelineClient timelineClient, 
+      final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        try {
+          timelineClient.putEntities(entity);
+        } catch (IOException|YarnException e) {
+          LOG.error("putEntityNonBlocking get failed: " + e);
+          throw new RuntimeException(e.toString());
         }
       }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  // create JobEntity from HistoryEvent with adding other info, like: 
+  // jobId, timestamp and entityType.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createJobEntity(HistoryEvent event, long timestamp, JobId jobId, 
+      String entityType) {
+    
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(jobId.toString());
+    return entity;
+  }
+  
+  // create BaseEntity from HistoryEvent with adding other info, like: 
+  // timestamp and entityType.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createBaseEntity(HistoryEvent event, long timestamp, String entityType) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent = 
+        event.toTimelineEvent();
+    tEvent.setTimestamp(timestamp);
+    
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+    entity.addEvent(tEvent);
+    entity.setType(entityType);
+    return entity;
+  }
+  
+  // create TaskEntity from HistoryEvent with adding other info, like: 
+  // taskId, jobId, timestamp, entityType and relatedJobEntity.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createTaskEntity(HistoryEvent event, long timestamp, String taskId,
+      String entityType, String relatedJobEntity, JobId jobId) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(taskId);
+    entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+    return entity;
+  }
+  
+  // create TaskAttemptEntity from HistoryEvent with adding other info, like: 
+  // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
+  private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
+      createTaskAttemptEntity(HistoryEvent event, long timestamp, 
+      String taskAttemptId, String entityType, String relatedTaskEntity, 
+      String taskId) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        createBaseEntity(event, timestamp, entityType);
+    entity.setId(taskAttemptId);
+    entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+    return entity;
+  }
+  
+  private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
+      long timestamp) {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
+    String taskId = null;
+    String taskAttemptId = null;
+
+    switch (event.getEventType()) {
+      // Handle job events
+      case JOB_SUBMITTED:
+      case JOB_STATUS_CHANGED:
+      case JOB_INFO_CHANGED:
+      case JOB_INITED:
+      case JOB_PRIORITY_CHANGED:
+      case JOB_QUEUE_CHANGED:
+      case JOB_FAILED:
+      case JOB_KILLED:
+      case JOB_ERROR:
+      case JOB_FINISHED:
+      case AM_STARTED:
+      case NORMALIZED_RESOURCE:
+        break;
+      // Handle task events
+      case TASK_STARTED:
+        taskId = ((TaskStartedEvent)event).getTaskId().toString();
+        break;
+      case TASK_FAILED:
+        taskId = ((TaskFailedEvent)event).getTaskId().toString();
+        break;
+      case TASK_UPDATED:
+        taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
+        break;
+      case TASK_FINISHED:
+        taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+        break;
+      case MAP_ATTEMPT_STARTED:
+      case CLEANUP_ATTEMPT_STARTED:
+      case REDUCE_ATTEMPT_STARTED:
+      case SETUP_ATTEMPT_STARTED:
+        taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptStartedEvent)event).
+            getTaskAttemptId().toString();
+        break;
+      case MAP_ATTEMPT_FAILED:
+      case CLEANUP_ATTEMPT_FAILED:
+      case REDUCE_ATTEMPT_FAILED:
+      case SETUP_ATTEMPT_FAILED:
+      case MAP_ATTEMPT_KILLED:
+      case CLEANUP_ATTEMPT_KILLED:
+      case REDUCE_ATTEMPT_KILLED:
+      case SETUP_ATTEMPT_KILLED:
+        taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
+            getTaskAttemptId().toString();
+        break;
+      case MAP_ATTEMPT_FINISHED:
+        taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      case REDUCE_ATTEMPT_FINISHED:
+        taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      case SETUP_ATTEMPT_FINISHED:
+      case CLEANUP_ATTEMPT_FINISHED:
+        taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
+        taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString();
+        break;
+      default:
+        LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
+            " and handled by timeline service.");
+        return;
     }
-    return nodes;
+    if (taskId == null) {
+      // JobEntity
+      tEntity = createJobEntity(event, timestamp, jobId,
+          MAPREDUCE_JOB_ENTITY_TYPE);
+    } else {
+      if (taskAttemptId == null) {
+        // TaskEntity
+        tEntity = createTaskEntity(event, timestamp, taskId,
+            MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId);
+      } else {
+        // TaskAttemptEntity
+        tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
+            MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
+            taskId);
+      }
+    }
+
+    putEntityWithoutBlocking(timelineClient, tEntity);
   }
 
   private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 6dc830f..dafb6e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1004,6 +1005,7 @@ public class MRAppMaster extends CompositeService {
     private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
+    private TimelineClient timelineClient = null;
 
     private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
 
@@ -1013,6 +1015,23 @@ public class MRAppMaster extends CompositeService {
       this.clientToAMTokenSecretManager =
           new ClientToAMTokenSecretManager(appAttemptID, null);
       this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
+      if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+              MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
+            && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+                YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+
+        boolean newTimelineServiceEnabled = conf.getBoolean(
+            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+            
+        if (newTimelineServiceEnabled) {
+          // create new version TimelineClient
+          timelineClient = TimelineClient.createTimelineClient(
+              appAttemptID.getApplicationId());
+        } else {
+          timelineClient = TimelineClient.createTimelineClient();
+        }
+      }
     }
 
     @Override
@@ -1103,6 +1122,10 @@ public class MRAppMaster extends CompositeService {
       return taskAttemptFinishingMonitor;
     }
 
+    // Get Timeline Collector's address (get sync from RM)
+    public TimelineClient getTimelineClient() {
+      return timelineClient;
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index ac4c586..e5338b5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -769,6 +770,14 @@ public class RMContainerAllocator extends RMContainerRequestor
     computeIgnoreBlacklisting();
 
     handleUpdatedNodes(response);
+    String collectorAddr = response.getCollectorAddr();
+    MRAppMaster.RunningAppContext appContext = 
+        (MRAppMaster.RunningAppContext)this.getContext();
+    if (collectorAddr != null && !collectorAddr.isEmpty()
+        && appContext.getTimelineClient() != null) {
+      appContext.getTimelineClient().setTimelineServiceAddress(
+        response.getCollectorAddr());
+    }
 
     for (ContainerStatus cont : finishedContainers) {
       LOG.info("Received completed container " + cont.getContainerId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
index 7612ceb..e7d5006 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.junit.Test;
 
 public class TestEvents {
@@ -404,7 +405,12 @@ public class TestEvents {
     public void setDatum(Object datum) {
       this.datum = datum;
     }
-
+    
+    @Override
+    public TimelineEvent toTimelineEvent() {
+      return null;
+    }
+    
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 2b07efb..2c159f1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -666,7 +667,7 @@ public class TestJobHistoryEventHandler {
     group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
     group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
     group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
-    JsonNode jsonNode = jheh.countersToJSON(counters);
+    JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
         + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
@@ -689,19 +690,19 @@ public class TestJobHistoryEventHandler {
   public void testCountersToJSONEmpty() throws Exception {
     JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
     Counters counters = null;
-    JsonNode jsonNode = jheh.countersToJSON(counters);
+    JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     String expected = "[]";
     Assert.assertEquals(expected, jsonStr);
 
     counters = new Counters();
-    jsonNode = jheh.countersToJSON(counters);
+    jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     expected = "[]";
     Assert.assertEquals(expected, jsonStr);
 
     counters.addGroup("DOCTORS", "Incarnations of the Doctor");
-    jsonNode = jheh.countersToJSON(counters);
+    jsonNode = JobHistoryEventUtils.countersToJSON(counters);
     jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
     expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
         + "Doctor\",\"COUNTERS\":[]}]";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 59b887d..9c1b835 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -462,6 +462,11 @@ public interface MRJobConfig {
     "mapreduce.job.emit-timeline-data";
   public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
       false;
+  
+  public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+      "mapreduce.job.new-timeline-service.enabled";
+  public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+      false;
 
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
index ea2ca9e..bbc7090 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import org.apache.avro.util.Utf8;
@@ -166,4 +168,20 @@ public class AMStartedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.AM_STARTED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("APPLICATION_ATTEMPT_ID",
+        getAppAttemptId() == null ? "" : getAppAttemptId().toString());
+    tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+        "" : getContainerId().toString());
+    tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost());
+    tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort());
+    tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort());
+    tEvent.addInfo("START_TIME", getStartTime());
+    return tEvent;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
index a30748c..61ce217 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Interface for event wrapper classes.  Implementations each wrap an
@@ -37,4 +38,7 @@ public interface HistoryEvent {
 
   /** Set the Avro datum wrapped by this. */
   void setDatum(Object datum);
+  
+  /** Map HistoryEvent to TimelineEvent */
+  TimelineEvent toTimelineEvent();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
index 0fa5868..80d3ee6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
@@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of job
@@ -133,4 +136,26 @@ public class JobFinishedEvent  implements HistoryEvent {
   public Counters getReduceCounters() {
     return reduceCounters;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+    tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+    tEvent.addInfo("FAILED_MAPS", getFailedMaps());
+    tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
+    tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+    tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+    tEvent.addInfo("MAP_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getMapCounters()));
+    tEvent.addInfo("REDUCE_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getReduceCounters()));
+    tEvent.addInfo("TOTAL_COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getTotalCounters()));
+    // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
+    tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
+    return tEvent;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
index b45f373..ad82443 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -64,5 +66,14 @@ public class JobInfoChangeEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_INFO_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+    tEvent.addInfo("LAUNCH_TIME", getLaunchTime());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
index 5abb40e..3e0f2f7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent {
   }
   /** Get whether the job's map and reduce stages were combined */
   public boolean getUberized() { return datum.getUberized(); }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("START_TIME", getLaunchTime());
+    tEvent.addInfo("STATUS", getStatus());
+    tEvent.addInfo("TOTAL_MAPS", getTotalMaps());
+    tEvent.addInfo("TOTAL_REDUCES", getTotalReduces());
+    tEvent.addInfo("UBERIZED", getUberized());
+    return tEvent;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
index 6c32462..5deea0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -64,5 +66,13 @@ public class JobPriorityChangeEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_PRIORITY_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("PRIORITY", getPriority().toString());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
index 86078e6..b9dd359 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 @SuppressWarnings("deprecation")
 public class JobQueueChangeEvent implements HistoryEvent {
@@ -59,5 +61,13 @@ public class JobQueueChangeEvent implements HistoryEvent {
     }
     return null;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("QUEUE_NAMES", getJobQueueName());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
index 9e11a55..a4f2da2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -60,5 +62,13 @@ public class JobStatusChangedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.JOB_STATUS_CHANGED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("STATUS", getStatus());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
index 845f6f7..47b2840 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -205,5 +207,26 @@ public class JobSubmittedEvent implements HistoryEvent {
   }
   /** Get the event type */
   public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+    tEvent.addInfo("QUEUE_NAME", getJobQueueName());
+    tEvent.addInfo("JOB_NAME", getJobName());
+    tEvent.addInfo("USER_NAME", getUserName());
+    tEvent.addInfo("JOB_CONF_PATH", getJobConfPath());
+    tEvent.addInfo("ACLS", getJobAcls());
+    tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName());
+    tEvent.addInfo("WORKLFOW_ID", getWorkflowId());
+    tEvent.addInfo("WORKFLOW_NAME", getWorkflowName());
+    tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName());
+    tEvent.addInfo("WORKFLOW_ADJACENCIES",
+        getWorkflowAdjacencies());
+    tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags());
+    
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
index 1b7773d..ea9798c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
@@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import java.util.Collections;
 
@@ -119,4 +121,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
     final CharSequence diagnostics = datum.getDiagnostics();
     return diagnostics == null ? NODIAGS : diagnostics.toString();
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+    tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+    tEvent.addInfo("JOB_STATUS", getStatus());
+    tEvent.addInfo("DIAGNOSTICS", getDiagnostics());
+    tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+    tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+    return tEvent;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index bc046c7..36737e9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of a map attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapAttemptFinishedEvent  implements HistoryEvent {
+public class MapAttemptFinishedEvent implements HistoryEvent {
 
   private MapAttemptFinished datum = null;
 
@@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent  implements HistoryEvent {
     return physMemKbytes;
   }
   
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    return tEvent;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
index b8f049c..daa454c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the normalized map/reduce requirements.
@@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent {
   public void setDatum(Object datum) {
     throw new UnsupportedOperationException("Not a seriable object");
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("MEMORY", "" + getMemory());
+    tEvent.addInfo("TASK_TYPE", getTaskType());
+    return tEvent;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 6644a48..6087c7a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful completion of a reduce attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ReduceAttemptFinishedEvent  implements HistoryEvent {
+public class ReduceAttemptFinishedEvent implements HistoryEvent {
 
   private ReduceAttemptFinished datum = null;
 
@@ -222,5 +225,25 @@ public class ReduceAttemptFinishedEvent  implements HistoryEvent {
   public int[] getPhysMemKbytes() {
     return physMemKbytes;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index bb7dbe0..c7c4387 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record successful task completion
@@ -135,5 +138,21 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
            ? EventType.MAP_ATTEMPT_FINISHED
            : EventType.REDUCE_ATTEMPT_FINISHED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+        "" : getAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("STATE", getState());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("HOSTNAME", getHostname());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
index c8c250a..d8baec4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -132,5 +134,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
     }
     return null;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID",
+        getTaskAttemptId().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("HTTP_PORT", getHttpPort());
+    tEvent.addInfo("TRACKER_NAME", getTrackerName());
+    tEvent.addInfo("SHUFFLE_PORT", getShufflePort());
+    tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+        "" : getContainerId().toString());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 77ee2a0..0bb1358 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.hadoop.mapred.ProgressSplitsBlock;
 
@@ -247,5 +250,26 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
   public int[] getPhysMemKbytes() {
     return physMemKbytes;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ?
+        "" : getTaskAttemptId().toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("STATUS", getTaskStatus());
+    tEvent.addInfo("HOSTNAME", getHostname());
+    tEvent.addInfo("PORT", getPort());
+    tEvent.addInfo("RACK_NAME", getRackName());
+    tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index 2838f08..5e82dea 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -136,5 +140,20 @@ public class TaskFailedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_FAILED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString());
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("ERROR", getError());
+    tEvent.addInfo("FAILED_ATTEMPT_ID",
+        getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
+    tEvent.addInfo("COUNTERS_GROUPS", 
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index d4ec74d..e359e32 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the successful completion of a task
@@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent {
     return EventType.TASK_FINISHED;
   }
 
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("COUNTERS_GROUPS",
+        JobHistoryEventUtils.countersToJSON(getCounters()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+    tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+        getSuccessfulTaskAttemptId() == null ? "" : 
+            getSuccessfulTaskAttemptId().toString());
+    return tEvent;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
index ed53b03..d1b97bf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 /**
  * Event to record the start of a task
@@ -71,5 +73,15 @@ public class TaskStartedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_STARTED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+    tEvent.addInfo("START_TIME", getStartTime());
+    tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
index 58f4143..b9a389c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 
 import org.apache.avro.util.Utf8;
 
@@ -60,5 +62,13 @@ public class TaskUpdatedEvent implements HistoryEvent {
   public EventType getEventType() {
     return EventType.TASK_UPDATED;
   }
+  
+  @Override
+  public TimelineEvent toTimelineEvent() {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+    tEvent.addInfo("FINISH_TIME", getFinishTime());
+    return tEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
new file mode 100644
index 0000000..f4896ff
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+
+public class JobHistoryEventUtils {
+
+  public static JsonNode countersToJSON(Counters counters) {
+    ObjectMapper mapper = new ObjectMapper();
+    ArrayNode nodes = mapper.createArrayNode();
+    if (counters != null) {
+      for (CounterGroup counterGroup : counters) {
+        ObjectNode groupNode = nodes.addObject();
+        groupNode.put("NAME", counterGroup.getName());
+        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+        ArrayNode countersNode = groupNode.putArray("COUNTERS");
+        for (Counter counter : counterGroup) {
+          ObjectNode counterNode = countersNode.addObject();
+          counterNode.put("NAME", counter.getName());
+          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+          counterNode.put("VALUE", counter.getValue());
+        }
+      }
+    }
+    return nodes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e58f943/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 6d205c5..c7f61cf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -607,6 +607,13 @@
     </description>
 </property>
 
+ <property>
+    <name>mapreduce.job.new-timeline-service.enabled</name>
+    <value>false</value>
+    <description>Specifies if posting job and task events to new timeline service.
+    </description>
+</property>
+
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>


Mime
View raw message