hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [4/4] hadoop git commit: YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena.
Date Mon, 21 Nov 2016 22:37:43 GMT
YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena.

(cherry picked from commit f734977b27a514ce0561638c0a6a17b1ef093026)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c0cc0eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c0cc0eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c0cc0eb

Branch: refs/heads/YARN-5355-branch-2
Commit: 5c0cc0ebf51cfec3432d96b78a0493034b52397f
Parents: 0b89ab2
Author: Sangjin Lee <sjlee@apache.org>
Authored: Mon Nov 21 13:48:35 2016 -0800
Committer: Sangjin Lee <sjlee@apache.org>
Committed: Mon Nov 21 14:36:52 2016 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  27 ++-
 .../v2/app/job/impl/TaskAttemptImpl.java        |  58 +++----
 .../mapreduce/v2/app/job/impl/TaskImpl.java     |  19 +-
 .../hadoop/mapreduce/jobhistory/TestEvents.java |   4 +-
 .../jobhistory/TestJobHistoryEventHandler.java  |   8 +-
 .../jobhistory/MapAttemptFinishedEvent.java     |  87 ++++++----
 .../jobhistory/ReduceAttemptFinishedEvent.java  |  83 ++++++---
 .../jobhistory/TaskAttemptFinishedEvent.java    |  47 +++--
 .../TaskAttemptUnsuccessfulCompletionEvent.java |  50 ++++--
 .../mapreduce/jobhistory/TaskFailedEvent.java   |  51 ++++--
 .../mapreduce/jobhistory/TaskFinishedEvent.java |  42 +++--
 .../mapred/TestMRTimelineEventHandling.java     |  30 +++-
 .../distributedshell/ApplicationMaster.java     |  41 ++++-
 .../distributedshell/TestDistributedShell.java  | 173 +++++++++++--------
 .../containermanager/ContainerManagerImpl.java  |   6 +-
 .../ApplicationContainerFinishedEvent.java      |   9 +-
 .../containermanager/container/Container.java   |   2 +
 .../container/ContainerImpl.java                |  22 ++-
 .../recovery/NMLeveldbStateStoreService.java    |  21 ++-
 .../recovery/NMNullStateStoreService.java       |   2 +-
 .../recovery/NMStateStoreService.java           |  13 +-
 .../timelineservice/NMTimelinePublisher.java    |  18 +-
 .../application/TestApplication.java            |   2 +-
 .../recovery/NMMemoryStateStoreService.java     |   6 +-
 .../TestNMLeveldbStateStoreService.java         |   4 +-
 .../nodemanager/webapp/MockContainer.java       |   5 +
 .../nodemanager/webapp/TestNMWebServer.java     |   4 +-
 .../metrics/TimelineServiceV2Publisher.java     |  12 +-
 .../TestSystemMetricsPublisherForV2.java        |  13 +-
 29 files changed, 579 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index deb2aab..ee5f8bd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.node.JsonNodeFactory;
 
@@ -1119,7 +1120,7 @@ public class JobHistoryEventHandler extends AbstractService
   private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
       createTaskEntity(HistoryEvent event, long timestamp, String taskId,
       String entityType, String relatedJobEntity, JobId jobId,
-      boolean setCreatedTime) {
+      boolean setCreatedTime, long taskIdPrefix) {
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         createBaseEntity(event, timestamp, entityType, setCreatedTime);
     entity.setId(taskId);
@@ -1128,6 +1129,7 @@ public class JobHistoryEventHandler extends AbstractService
           ((TaskStartedEvent)event).getTaskType().toString());
     }
     entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+    entity.setIdPrefix(taskIdPrefix);
     return entity;
   }
 
@@ -1136,11 +1138,12 @@ public class JobHistoryEventHandler extends AbstractService
   private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
       createTaskAttemptEntity(HistoryEvent event, long timestamp,
       String taskAttemptId, String entityType, String relatedTaskEntity,
-      String taskId, boolean setCreatedTime) {
+      String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         createBaseEntity(event, timestamp, entityType, setCreatedTime);
     entity.setId(taskAttemptId);
     entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+    entity.setIdPrefix(taskAttemptIdPrefix);
     return entity;
   }
 
@@ -1191,6 +1194,8 @@ public class JobHistoryEventHandler extends AbstractService
     String taskId = null;
     String taskAttemptId = null;
     boolean setCreatedTime = false;
+    long taskIdPrefix = 0;
+    long taskAttemptIdPrefix = 0;
 
     switch (event.getEventType()) {
     // Handle job events
@@ -1213,15 +1218,21 @@ public class JobHistoryEventHandler extends AbstractService
     case TASK_STARTED:
       setCreatedTime = true;
       taskId = ((TaskStartedEvent)event).getTaskId().toString();
+      taskIdPrefix = TimelineServiceHelper.
+          invertLong(((TaskStartedEvent)event).getStartTime());
       break;
     case TASK_FAILED:
       taskId = ((TaskFailedEvent)event).getTaskId().toString();
+      taskIdPrefix = TimelineServiceHelper.
+          invertLong(((TaskFailedEvent)event).getStartTime());
       break;
     case TASK_UPDATED:
       taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
       break;
     case TASK_FINISHED:
       taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+      taskIdPrefix = TimelineServiceHelper.
+          invertLong(((TaskFinishedEvent)event).getStartTime());
       break;
     case MAP_ATTEMPT_STARTED:
     case REDUCE_ATTEMPT_STARTED:
@@ -1229,6 +1240,8 @@ public class JobHistoryEventHandler extends AbstractService
       taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
       taskAttemptId = ((TaskAttemptStartedEvent)event).
           getTaskAttemptId().toString();
+      taskAttemptIdPrefix = TimelineServiceHelper.
+          invertLong(((TaskAttemptStartedEvent)event).getStartTime());
       break;
     case CLEANUP_ATTEMPT_STARTED:
     case SETUP_ATTEMPT_STARTED:
@@ -1248,16 +1261,22 @@ public class JobHistoryEventHandler extends AbstractService
           getTaskId().toString();
       taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
           getTaskAttemptId().toString();
+      taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
+          ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
       break;
     case MAP_ATTEMPT_FINISHED:
       taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
       taskAttemptId = ((MapAttemptFinishedEvent)event).
           getAttemptId().toString();
+      taskAttemptIdPrefix = TimelineServiceHelper.
+          invertLong(((MapAttemptFinishedEvent)event).getStartTime());
       break;
     case REDUCE_ATTEMPT_FINISHED:
       taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
       taskAttemptId = ((ReduceAttemptFinishedEvent)event).
           getAttemptId().toString();
+      taskAttemptIdPrefix = TimelineServiceHelper.
+          invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
       break;
     case SETUP_ATTEMPT_FINISHED:
     case CLEANUP_ATTEMPT_FINISHED:
@@ -1286,12 +1305,12 @@ public class JobHistoryEventHandler extends AbstractService
         // TaskEntity
         tEntity = createTaskEntity(event, timestamp, taskId,
             MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
-            jobId, setCreatedTime);
+            jobId, setCreatedTime, taskIdPrefix);
       } else {
         // TaskAttemptEntity
         tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
             MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
-            taskId, setCreatedTime);
+            taskId, setCreatedTime, taskAttemptIdPrefix);
       }
     }
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index cdfff3d..4e3d9b2 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1525,7 +1525,7 @@ public abstract class TaskAttemptImpl implements
             StringUtils.join(
                 LINE_SEPARATOR, taskAttempt.getDiagnostics()),
                 taskAttempt.getCounters(), taskAttempt
-                .getProgressSplitBlock().burst());
+                .getProgressSplitBlock().burst(), taskAttempt.launchTime);
     return tauce;
   }
 
@@ -1938,35 +1938,35 @@ public abstract class TaskAttemptImpl implements
         this.container == null ? -1 : this.container.getNodeId().getPort();
     if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
       MapAttemptFinishedEvent mfe =
-         new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
-         TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
-         state.toString(),
-         this.reportedStatus.mapFinishTime,
-         finishTime,
-         containerHostName,
-         containerNodePort,
-         this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
-         this.reportedStatus.stateString,
-         getCounters(),
-         getProgressSplitBlock().burst());
-         eventHandler.handle(
-           new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+          new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+          state.toString(),
+          this.reportedStatus.mapFinishTime,
+          finishTime,
+          containerHostName,
+          containerNodePort,
+          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+          this.reportedStatus.stateString,
+          getCounters(),
+          getProgressSplitBlock().burst(), launchTime);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
     } else {
-       ReduceAttemptFinishedEvent rfe =
-         new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
-         TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
-         state.toString(),
-         this.reportedStatus.shuffleFinishTime,
-         this.reportedStatus.sortFinishTime,
-         finishTime,
-         containerHostName,
-         containerNodePort,
-         this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
-         this.reportedStatus.stateString,
-         getCounters(),
-         getProgressSplitBlock().burst());
-         eventHandler.handle(
-           new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+      ReduceAttemptFinishedEvent rfe =
+          new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+          state.toString(),
+          this.reportedStatus.shuffleFinishTime,
+          this.reportedStatus.sortFinishTime,
+          finishTime,
+          containerHostName,
+          containerNodePort,
+          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+          this.reportedStatus.stateString,
+          getCounters(),
+          getProgressSplitBlock().burst(), launchTime);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index 34d9f0e..3ab1f31 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
   private final Set<TaskAttemptId> inProgressAttempts;
 
   private boolean historyTaskStartGenerated = false;
+  // Launch time reported in history events.
+  private long launchTime;
   
   private static final SingleArcTransition<TaskImpl, TaskEvent> 
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
@@ -704,8 +706,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   private void sendTaskStartedEvent() {
+    launchTime = getLaunchTime();
     TaskStartedEvent tse = new TaskStartedEvent(
-        TypeConverter.fromYarn(taskId), getLaunchTime(),
+        TypeConverter.fromYarn(taskId), launchTime,
         TypeConverter.fromYarn(taskId.getTaskType()),
         getSplitsAsString());
     eventHandler
@@ -713,18 +716,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     historyTaskStartGenerated = true;
   }
 
-  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
+  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
+      TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
         TypeConverter.fromYarn(task.successfulAttempt),
         task.getFinishTime(task.successfulAttempt),
         TypeConverter.fromYarn(task.taskId.getTaskType()),
-        taskState.toString(),
-        task.getCounters());
+        taskState.toString(), task.getCounters(), task.launchTime);
     return tfe;
   }
   
-  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
+  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
+      List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
     StringBuilder errorSb = new StringBuilder();
     if (diag != null) {
       for (String d : diag) {
@@ -739,7 +743,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
         errorSb.toString(),
         taskState.toString(),
         taId == null ? null : TypeConverter.fromYarn(taId),
-        task.getCounters());
+        task.getCounters(), task.launchTime);
     return taskFailedEvent;
   }
   
@@ -860,7 +864,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
       TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
           taskInfo.getFinishTime(), taskInfo.getTaskType(),
           taskInfo.getError(), taskInfo.getTaskStatus(),
-          taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+          taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
+          launchTime);
       eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
       eventHandler.handle(
           new JobTaskEvent(taskId, getExternalState(taskState)));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
index ac510b3..e271319 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
@@ -58,7 +58,7 @@ public class TestEvents {
     Counters counters = new Counters();
     TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
         TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
-        counters);
+        counters, 234);
     assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
 
     assertEquals(test.getCounters(), counters);
@@ -69,7 +69,7 @@ public class TestEvents {
     assertEquals(test.getTaskId(), tid);
     assertEquals(test.getTaskStatus(), "TEST");
     assertEquals(test.getTaskType(), TaskType.REDUCE);
-
+    assertEquals(234, test.getStartTime());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 01c53e3..9607cb6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -142,7 +142,7 @@ public class TestJobHistoryEventHandler {
 
       // First completion event, but min-queue-size for batching flushes is 10
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-          t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+          t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
       verify(mockWriter).flush();
 
     } finally {
@@ -178,7 +178,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -223,7 +223,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -266,7 +266,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
       }
       queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
           TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index 3121c4e..2b1357e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
- * Event to record successful completion of a map attempt
+ * Event to record successful completion of a map attempt.
  *
  */
 @InterfaceAudience.Private
@@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
   int[] cpuUsages;
   int[] vMemKbytes;
   int[] physMemKbytes;
+  private long startTime;
 
   /** 
-   * Create an event for successful completion of map attempts
+   * Create an event for successful completion of map attempts.
    * @param id Task Attempt ID
    * @param taskType Type of the task
    * @param taskStatus Status of the task
@@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
    *        virtual memory and physical memory. 
    *
    *        If you have no splits data, code {@code null} for this
-   *        parameter. 
+   *        parameter.
+   * @param startTs Task start time to be used for writing entity to ATSv2.
    */
-  public MapAttemptFinishedEvent
-      (TaskAttemptID id, TaskType taskType, String taskStatus, 
-       long mapFinishTime, long finishTime, String hostname, int port, 
-       String rackName, String state, Counters counters, int[][] allSplits) {
+  public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long mapFinishTime, long finishTime, String hostname,
+      int port, String rackName, String state, Counters counters,
+      int[][] allSplits, long startTs) {
     this.attemptId = id;
     this.taskType = taskType;
     this.taskStatus = taskStatus;
@@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
     this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
     this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
     this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+    this.startTime = startTs;
+  }
+
+  public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long mapFinishTime, long finishTime, String hostname,
+      int port, String rackName, String state, Counters counters,
+      int[][] allSplits) {
+    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port,
+        rackName, state, counters, allSplits,
+        SystemClock.getInstance().getTime());
   }
 
   /** 
@@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
    * @param counters Counters for the attempt
    */
   @Deprecated
-  public MapAttemptFinishedEvent
-      (TaskAttemptID id, TaskType taskType, String taskStatus, 
-       long mapFinishTime, long finishTime, String hostname,
-       String state, Counters counters) {
+  public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long mapFinishTime, long finishTime, String hostname,
+      String state, Counters counters) {
     this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
         state, counters, null);
   }
-  
-  
+
   MapAttemptFinishedEvent() {}
 
   public Object getDatum() {
@@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
     this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
   }
 
-  /** Get the task ID */
-  public TaskID getTaskId() { return attemptId.getTaskID(); }
-  /** Get the attempt id */
+  /** Gets the task ID. */
+  public TaskID getTaskId() {
+    return attemptId.getTaskID();
+  }
+  /** Gets the attempt id. */
   public TaskAttemptID getAttemptId() {
     return attemptId;
   }
 
-  /** Get the task type */
+  /** Gets the task type. */
   public TaskType getTaskType() {
     return taskType;
   }
-  /** Get the task status */
+  /** Gets the task status. */
   public String getTaskStatus() { return taskStatus.toString(); }
-  /** Get the map phase finish time */
+  /** Gets the map phase finish time. */
   public long getMapFinishTime() { return mapFinishTime; }
-  /** Get the attempt finish time */
+  /** Gets the attempt finish time. */
   public long getFinishTime() { return finishTime; }
-  /** Get the host name */
+  /**
+   * Gets the task attempt start time.
+   * @return task attempt start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets the host name. */
   public String getHostname() { return hostname.toString(); }
-  /** Get the tracker rpc port */
+  /** Gets the tracker rpc port. */
   public int getPort() { return port; }
   
-  /** Get the rack name */
+  /** Gets the rack name. */
   public String getRackName() {
     return rackName == null ? null : rackName.toString();
   }
-  
-  /** Get the state string */
-  public String getState() { return state.toString(); }
-  /** Get the counters */
-  Counters getCounters() { return counters; }
-  /** Get the event type */
+  /**
+   * Gets the attempt state string.
+   * @return map attempt state
+   */
+  public String getState() {
+    return state.toString();
+  }
+  /**
+   * Gets the counters.
+   * @return counters
+   */
+  Counters getCounters() {
+    return counters;
+  }
+  /** Gets the event type. */
    public EventType getEventType() {
     return EventType.MAP_ATTEMPT_FINISHED;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 9c0f09b..5a16f83 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
  * Event to record successful completion of a reduce attempt
@@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
   int[] cpuUsages;
   int[] vMemKbytes;
   int[] physMemKbytes;
+  private long startTime;
 
   /**
    * Create an event to record completion of a reduce attempt
@@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
    * @param allSplits the "splits", or a pixelated graph of various
    *        measurable worker node state variables against progress.
    *        Currently there are four; wallclock time, CPU time,
-   *        virtual memory and physical memory.  
+   *        virtual memory and physical memory.
+   * @param startTs Task start time to be used for writing entity to ATSv2.
    */
-  public ReduceAttemptFinishedEvent
-    (TaskAttemptID id, TaskType taskType, String taskStatus, 
-     long shuffleFinishTime, long sortFinishTime, long finishTime,
-     String hostname, int port,  String rackName, String state, 
-     Counters counters, int[][] allSplits) {
+  public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long shuffleFinishTime, long sortFinishTime,
+      long finishTime, String hostname, int port,  String rackName,
+      String state, Counters counters, int[][] allSplits, long startTs) {
     this.attemptId = id;
     this.taskType = taskType;
     this.taskStatus = taskStatus;
@@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
     this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
     this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
     this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+    this.startTime = startTs;
+  }
+
+  public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long shuffleFinishTime, long sortFinishTime,
+      long finishTime, String hostname, int port,  String rackName,
+      String state, Counters counters, int[][] allSplits) {
+    this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime,
+        finishTime, hostname, port, rackName, state, counters, allSplits,
+        SystemClock.getInstance().getTime());
   }
 
   /**
@@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
    * @param state State of the attempt
    * @param counters Counters for the attempt
    */
-  public ReduceAttemptFinishedEvent
-    (TaskAttemptID id, TaskType taskType, String taskStatus, 
-     long shuffleFinishTime, long sortFinishTime, long finishTime,
-     String hostname, String state, Counters counters) {
+  public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long shuffleFinishTime, long sortFinishTime,
+      long finishTime, String hostname, String state, Counters counters) {
     this(id, taskType, taskStatus,
-         shuffleFinishTime, sortFinishTime, finishTime,
-         hostname, -1, "", state, counters, null);
+        shuffleFinishTime, sortFinishTime, finishTime,
+        hostname, -1, "", state, counters, null);
   }
 
   ReduceAttemptFinishedEvent() {}
@@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
     this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
   }
 
-  /** Get the Task ID */
+  /** Gets the Task ID. */
   public TaskID getTaskId() { return attemptId.getTaskID(); }
-  /** Get the attempt id */
+  /** Gets the attempt id. */
   public TaskAttemptID getAttemptId() {
     return attemptId;
   }
-  /** Get the task type */
+  /** Gets the task type. */
   public TaskType getTaskType() {
     return taskType;
   }
-  /** Get the task status */
+  /** Gets the task status. */
   public String getTaskStatus() { return taskStatus.toString(); }
-  /** Get the finish time of the sort phase */
+  /** Gets the finish time of the sort phase. */
   public long getSortFinishTime() { return sortFinishTime; }
-  /** Get the finish time of the shuffle phase */
+  /** Gets the finish time of the shuffle phase. */
   public long getShuffleFinishTime() { return shuffleFinishTime; }
-  /** Get the finish time of the attempt */
+  /** Gets the finish time of the attempt. */
   public long getFinishTime() { return finishTime; }
-  /** Get the name of the host where the attempt ran */
+  /**
+   * Gets the start time.
+   * @return task attempt start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets the name of the host where the attempt ran. */
   public String getHostname() { return hostname.toString(); }
-  /** Get the tracker rpc port */
+  /** Gets the tracker rpc port. */
   public int getPort() { return port; }
   
-  /** Get the rack name of the node where the attempt ran */
+  /** Gets the rack name of the node where the attempt ran. */
   public String getRackName() {
     return rackName == null ? null : rackName.toString();
   }
-  
-  /** Get the state string */
-  public String getState() { return state.toString(); }
-  /** Get the counters for the attempt */
-  Counters getCounters() { return counters; }
-  /** Get the event type */
+  /**
+   * Gets the state string.
+   * @return reduce attempt state
+   */
+  public String getState() {
+    return state.toString();
+  }
+  /**
+   * Gets the counters.
+   * @return counters
+   */
+  Counters getCounters() {
+    return counters;
+  }
+  /** Gets the event type. */
   public EventType getEventType() {
     return EventType.REDUCE_ATTEMPT_FINISHED;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index a931ca2..c28c216 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
  * Event to record successful task completion
@@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
   private String hostname;
   private String state;
   private Counters counters;
+  private long startTime;
 
   /**
-   * Create an event to record successful finishes for setup and cleanup 
-   * attempts
+   * Create an event to record successful finishes for setup and cleanup
+   * attempts.
    * @param id Attempt ID
    * @param taskType Type of task
    * @param taskStatus Status of task
@@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
    * @param hostname Host where the attempt executed
    * @param state State string
    * @param counters Counters for the attempt
+   * @param startTs Task start time to be used for writing entity to ATSv2.
    */
   public TaskAttemptFinishedEvent(TaskAttemptID id, 
       TaskType taskType, String taskStatus, 
       long finishTime, String rackName,
-      String hostname, String state, Counters counters) {
+      String hostname, String state, Counters counters, long startTs) {
     this.attemptId = id;
     this.taskType = taskType;
     this.taskStatus = taskStatus;
@@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
     this.hostname = hostname;
     this.state = state;
     this.counters = counters;
+    this.startTime = startTs;
+  }
+
+  public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+      String taskStatus, long finishTime, String rackName, String hostname,
+      String state, Counters counters) {
+    this(id, taskType, taskStatus, finishTime, rackName, hostname, state,
+        counters, SystemClock.getInstance().getTime());
   }
 
   TaskAttemptFinishedEvent() {}
@@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
     this.counters = EventReader.fromAvro(datum.getCounters());
   }
 
-  /** Get the task ID */
+  /** Gets the task ID. */
   public TaskID getTaskId() { return attemptId.getTaskID(); }
-  /** Get the task attempt id */
+  /** Gets the task attempt id. */
   public TaskAttemptID getAttemptId() {
     return attemptId;
   }
-  /** Get the task type */
+  /** Gets the task type. */
   public TaskType getTaskType() {
     return taskType;
   }
-  /** Get the task status */
+  /** Gets the task status. */
   public String getTaskStatus() { return taskStatus.toString(); }
-  /** Get the attempt finish time */
+  /** Gets the attempt finish time. */
   public long getFinishTime() { return finishTime; }
-  /** Get the host where the attempt executed */
+  /**
+   * Gets the task attempt start time to be used while publishing to ATSv2.
+   * @return task attempt start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets the host where the attempt executed. */
   public String getHostname() { return hostname.toString(); }
   
-  /** Get the rackname where the attempt executed */
+  /** Gets the rackname where the attempt executed. */
   public String getRackName() {
     return rackName == null ? null : rackName.toString();
   }
   
-  /** Get the state string */
+  /**
+   * Gets the state string.
+   * @return task attempt state.
+   */
   public String getState() { return state.toString(); }
-  /** Get the counters for the attempt */
+  /** Gets the counters for the attempt. */
   Counters getCounters() { return counters; }
-  /** Get the event type */
+  /** Gets the event type. */
   public EventType getEventType() {
     // Note that the task type can be setup/map/reduce/cleanup but the 
     // attempt-type can only be map/reduce.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 1732d91..491700c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
  * Event to record unsuccessful (Killed/Failed) completion of task attempts
@@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
   int[] cpuUsages;
   int[] vMemKbytes;
   int[] physMemKbytes;
+  private long startTime;
   private static final Counters EMPTY_COUNTERS = new Counters();
 
   /** 
-   * Create an event to record the unsuccessful completion of attempts
+   * Create an event to record the unsuccessful completion of attempts.
    * @param id Attempt ID
    * @param taskType Type of the task
    * @param status Status of the attempt
@@ -74,13 +76,14 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
    * @param allSplits the "splits", or a pixelated graph of various
    *        measurable worker node state variables against progress.
    *        Currently there are four; wallclock time, CPU time,
-   *        virtual memory and physical memory.  
+   *        virtual memory and physical memory.
+   * @param startTs Task start time to be used for writing entity to ATSv2.
    */
   public TaskAttemptUnsuccessfulCompletionEvent
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime,
         String hostname, int port, String rackName,
-        String error, Counters counters, int[][] allSplits) {
+        String error, Counters counters, int[][] allSplits, long startTs) {
     this.attemptId = id;
     this.taskType = taskType;
     this.status = status;
@@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
         ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
     this.physMemKbytes =
         ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+    this.startTime = startTs;
+  }
+
+  public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
+      TaskType taskType, String status, long finishTime, String hostname,
+      int port, String rackName, String error, Counters counters,
+      int[][] allSplits) {
+    this(id, taskType, status, finishTime, hostname, port, rackName, error,
+        counters, allSplits, SystemClock.getInstance().getTime());
   }
 
   /** 
@@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
         AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
   }
 
-  /** Get the task id */
+  /** Gets the task id. */
   public TaskID getTaskId() {
     return attemptId.getTaskID();
   }
-  /** Get the task type */
+  /** Gets the task type. */
   public TaskType getTaskType() {
     return TaskType.valueOf(taskType.toString());
   }
-  /** Get the attempt id */
+  /** Gets the attempt id. */
   public TaskAttemptID getTaskAttemptId() {
     return attemptId;
   }
-  /** Get the finish time */
+  /** Gets the finish time. */
   public long getFinishTime() { return finishTime; }
-  /** Get the name of the host where the attempt executed */
+  /**
+   * Gets the task attempt start time to be used while publishing to ATSv2.
+   * @return task attempt start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets the name of the host where the attempt executed. */
   public String getHostname() { return hostname; }
-  /** Get the rpc port for the host where the attempt executed */
+  /** Gets the rpc port for the host where the attempt executed. */
   public int getPort() { return port; }
   
-  /** Get the rack name of the node where the attempt ran */
+  /** Gets the rack name of the node where the attempt ran. */
   public String getRackName() {
     return rackName == null ? null : rackName.toString();
   }
   
-  /** Get the error string */
+  /** Gets the error string. */
   public String getError() { return error.toString(); }
-  /** Get the task status */
+  /**
+   * Gets the task attempt status.
+   * @return task attempt status.
+   */
   public String getTaskStatus() {
     return status.toString();
   }
-  /** Get the counters */
+  /** Gets the counters. */
   Counters getCounters() { return counters; }
-  /** Get the event type */
+  /** Gets the event type. */
   public EventType getEventType() {
     // Note that the task type can be setup/map/reduce/cleanup but the 
     // attempt-type can only be map/reduce.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index d14350d..b4d9e41 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
  * Event to record the failure of a task
@@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent {
   private String status;
   private String error;
   private Counters counters;
+  private long startTime;
 
   private static final Counters EMPTY_COUNTERS = new Counters();
 
   /**
-   * Create an event to record task failure
+   * Create an event to record task failure.
    * @param id Task ID
    * @param finishTime Finish time of the task
    * @param taskType Type of the task
@@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent {
    * @param status Status
    * @param failedDueToAttempt The attempt id due to which the task failed
    * @param counters Counters for the task
+   * @param startTs task start time.
    */
   public TaskFailedEvent(TaskID id, long finishTime, 
       TaskType taskType, String error, String status,
-      TaskAttemptID failedDueToAttempt, Counters counters) {
+      TaskAttemptID failedDueToAttempt, Counters counters, long startTs) {
     this.id = id;
     this.finishTime = finishTime;
     this.taskType = taskType;
@@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent {
     this.status = status;
     this.failedDueToAttempt = failedDueToAttempt;
     this.counters = counters;
+    this.startTime = startTs;
+  }
+
+  public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType,
+      String error, String status, TaskAttemptID failedDueToAttempt,
+      Counters counters) {
+    this(id, finishTime, taskType, error, status, failedDueToAttempt, counters,
+        SystemClock.getInstance().getTime());
   }
 
   public TaskFailedEvent(TaskID id, long finishTime, 
-	      TaskType taskType, String error, String status,
-	      TaskAttemptID failedDueToAttempt) {
-    this(id, finishTime, taskType, error, status,
-        failedDueToAttempt, EMPTY_COUNTERS);
+      TaskType taskType, String error, String status,
+      TaskAttemptID failedDueToAttempt) {
+    this(id, finishTime, taskType, error, status, failedDueToAttempt,
+        EMPTY_COUNTERS);
   }
-  
+
   TaskFailedEvent() {}
 
   public Object getDatum() {
@@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent {
         EventReader.fromAvro(datum.getCounters());
   }
 
-  /** Get the task id */
+  /** Gets the task id. */
   public TaskID getTaskId() { return id; }
-  /** Get the error string */
+  /** Gets the error string. */
   public String getError() { return error; }
-  /** Get the finish time of the attempt */
+  /** Gets the finish time of the attempt. */
   public long getFinishTime() {
     return finishTime;
   }
-  /** Get the task type */
+  /**
+   * Gets the task start time to be reported to ATSv2.
+   * @return task start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets the task type. */
   public TaskType getTaskType() {
     return taskType;
   }
-  /** Get the attempt id due to which the task failed */
+  /** Gets the attempt id due to which the task failed. */
   public TaskAttemptID getFailedAttemptID() {
     return failedDueToAttempt;
   }
-  /** Get the task status */
+  /**
+   * Gets the task status.
+   * @return task status
+   */
   public String getTaskStatus() { return status; }
-  /** Get task counters */
+  /** Gets task counters. */
   public Counters getCounters() { return counters; }
-  /** Get the event type */
+  /** Gets the event type. */
   public EventType getEventType() {
     return EventType.TASK_FAILED;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index 0bc4383..97557c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 /**
  * Event to record the successful completion of a task
@@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent {
   private TaskType taskType;
   private String status;
   private Counters counters;
-  
+  private long startTime;
+
   /**
-   * Create an event to record the successful completion of a task
+   * Create an event to record the successful completion of a task.
    * @param id Task ID
    * @param attemptId Task Attempt ID of the successful attempt for this task
    * @param finishTime Finish time of the task
    * @param taskType Type of the task
    * @param status Status string
    * @param counters Counters for the task
+   * @param startTs task start time
    */
   public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
                            TaskType taskType,
-                           String status, Counters counters) {
+                           String status, Counters counters, long startTs) {
     this.taskid = id;
     this.successfulAttemptId = attemptId;
     this.finishTime = finishTime;
     this.taskType = taskType;
     this.status = status;
     this.counters = counters;
+    this.startTime = startTs;
   }
-  
+
+  public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
+          TaskType taskType, String status, Counters counters) {
+    this(id, attemptId, finishTime, taskType, status, counters,
+        SystemClock.getInstance().getTime());
+  }
+
   TaskFinishedEvent() {}
 
   public Object getDatum() {
@@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent {
     this.counters = EventReader.fromAvro(datum.getCounters());
   }
 
-  /** Get task id */
+  /** Gets task id. */
   public TaskID getTaskId() { return taskid; }
-  /** Get successful task attempt id */
+  /** Gets successful task attempt id. */
   public TaskAttemptID getSuccessfulTaskAttemptId() {
     return successfulAttemptId;
   }
-  /** Get the task finish time */
+  /** Gets the task finish time. */
   public long getFinishTime() { return finishTime; }
-  /** Get task counters */
+  /**
+   * Gets the task start time to be reported to ATSv2.
+   * @return task start time
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  /** Gets task counters. */
   public Counters getCounters() { return counters; }
-  /** Get task type */
+  /** Gets task type. */
   public TaskType getTaskType() {
     return taskType;
   }
-  /** Get task status */
+  /**
+   * Gets task status.
+   * @return task status
+   */
   public String getTaskStatus() { return status.toString(); }
-  /** Get event type */
+  /** Gets event type. */
   public EventType getEventType() {
     return EventType.TASK_FINISHED;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index 53ad6bc..010413b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -296,10 +296,10 @@ public class TestMRTimelineEventHandling {
         " does not exist.",
         jobEventFile.exists());
     verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
-        true, false, null);
+        true, false, null, false);
     Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
         "huge_dummy_conf1", "huge_dummy_conf2");
-    verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
+    verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false);
 
     // for this test, we expect MR job metrics are published in YARN_APPLICATION
     String outputAppDir =
@@ -320,8 +320,8 @@ public class TestMRTimelineEventHandling {
         "appEventFilePath: " + appEventFilePath +
         " does not exist.",
         appEventFile.exists());
-    verifyEntity(appEventFile, null, true, false, null);
-    verifyEntity(appEventFile, null, false, true, cfgsToCheck);
+    verifyEntity(appEventFile, null, true, false, null, false);
+    verifyEntity(appEventFile, null, false, true, cfgsToCheck, false);
 
     // check for task event file
     String outputDirTask =
@@ -342,7 +342,7 @@ public class TestMRTimelineEventHandling {
         " does not exist.",
         taskEventFile.exists());
     verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
-        true, false, null);
+        true, false, null, true);
 
     // check for task attempt event file
     String outputDirTaskAttempt =
@@ -361,7 +361,7 @@ public class TestMRTimelineEventHandling {
     Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
         " does not exist.", taskAttemptEventFile.exists());
     verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
-        true, false, null);
+        true, false, null, true);
   }
 
   /**
@@ -378,12 +378,13 @@ public class TestMRTimelineEventHandling {
    * @throws IOException
    */
   private void verifyEntity(File entityFile, String eventId,
-      boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
-      throws IOException {
+      boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify,
+      boolean checkIdPrefix) throws IOException {
     BufferedReader reader = null;
     String strLine;
     try {
       reader = new BufferedReader(new FileReader(entityFile));
+      long idPrefix = -1;
       while ((strLine = reader.readLine()) != null) {
         if (strLine.trim().length() > 0) {
           org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@@ -392,6 +393,19 @@ public class TestMRTimelineEventHandling {
                       strLine.trim(),
                       org.apache.hadoop.yarn.api.records.timelineservice.
                           TimelineEntity.class);
+
+          LOG.info("strLine.trim()= " + strLine.trim());
+          if (checkIdPrefix) {
+            Assert.assertTrue("Entity ID prefix expected to be > 0" ,
+                entity.getIdPrefix() > 0);
+            if (idPrefix == -1) {
+              idPrefix = entity.getIdPrefix();
+            } else {
+              Assert.assertEquals("Entity ID prefix should be same across " +
+                  "each publish of same entity",
+                      idPrefix, entity.getIdPrefix());
+            }
+          }
           if (eventId == null) {
             // Job metrics are published without any events for
             // ApplicationEntity. There is also possibility that some other

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 17dae6b..f7a4195 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -103,6 +103,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.LogManager;
 
@@ -307,6 +309,17 @@ public class ApplicationMaster {
       Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
 
   /**
+   * Container start times used to set id prefix while publishing entity
+   * to ATSv2.
+   */
+  private final ConcurrentMap<ContainerId, Long> containerStartTimes =
+      new ConcurrentHashMap<ContainerId, Long>();
+
+  private ConcurrentMap<ContainerId, Long> getContainerStartTimes() {
+    return containerStartTimes;
+  }
+
+  /**
    * @param args Command line args
    */
   public static void main(String[] args) {
@@ -855,7 +868,15 @@ public class ApplicationMaster {
         }
         if(timelineClient != null) {
           if (timelineServiceV2) {
-            publishContainerEndEventOnTimelineServiceV2(containerStatus);
+            Long containerStartTime =
+                containerStartTimes.get(containerStatus.getContainerId());
+            if (containerStartTime == null) {
+              containerStartTime = SystemClock.getInstance().getTime();
+              containerStartTimes.put(containerStatus.getContainerId(),
+                  containerStartTime);
+            }
+            publishContainerEndEventOnTimelineServiceV2(containerStatus,
+                containerStartTime);
           } else {
             publishContainerEndEvent(
                 timelineClient, containerStatus, domainId, appSubmitterUgi);
@@ -985,8 +1006,11 @@ public class ApplicationMaster {
       }
       if(applicationMaster.timelineClient != null) {
         if (applicationMaster.timelineServiceV2) {
+          long startTime = SystemClock.getInstance().getTime();
+          applicationMaster.getContainerStartTimes().put(
+              containerId, startTime);
           applicationMaster.publishContainerStartEventOnTimelineServiceV2(
-              container);
+              container, startTime);
         } else {
           applicationMaster.publishContainerStartEvent(
               applicationMaster.timelineClient, container,
@@ -1348,24 +1372,24 @@ public class ApplicationMaster {
   }
 
   private void publishContainerStartEventOnTimelineServiceV2(
-      Container container) {
+      Container container, long startTime) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
         entity =
             new org.apache.hadoop.yarn.api.records.timelineservice.
             TimelineEntity();
     entity.setId(container.getId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
-    long ts = System.currentTimeMillis();
-    entity.setCreatedTime(ts);
+    entity.setCreatedTime(startTime);
     entity.addInfo("user", appSubmitterUgi.getShortUserName());
 
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
-    event.setTimestamp(ts);
+    event.setTimestamp(startTime);
     event.setId(DSEvent.DS_CONTAINER_START.toString());
     event.addInfo("Node", container.getNodeId().toString());
     event.addInfo("Resources", container.getResource().toString());
     entity.addEvent(event);
+    entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));
 
     try {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1383,7 +1407,7 @@ public class ApplicationMaster {
   }
 
   private void publishContainerEndEventOnTimelineServiceV2(
-      final ContainerStatus container) {
+      final ContainerStatus container, long containerStartTime) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
         entity =
             new org.apache.hadoop.yarn.api.records.timelineservice.
@@ -1399,6 +1423,7 @@ public class ApplicationMaster {
     event.addInfo("State", container.getState().name());
     event.addInfo("Exit Status", container.getExitStatus());
     entity.addEvent(event);
+    entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
 
     try {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1433,6 +1458,8 @@ public class ApplicationMaster {
     event.setId(appEvent.toString());
     event.setTimestamp(ts);
     entity.addEvent(event);
+    entity.setIdPrefix(
+        TimelineServiceHelper.invertLong(appAttemptID.getAttemptId()));
 
     try {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 300ea67..47485ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
 import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
@@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@@ -523,15 +526,31 @@ public class TestDistributedShell {
           "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
               + "_000001"
               + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
-          appTimestampFileName);
-
-      // Verify DS_CONTAINER entities posted by the client
+      File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
+          "DS_APP_ATTEMPT", appTimestampFileName);
+      // Check if required events are published and same idprefix is sent for
+      // on each publish.
+      verifyEntityForTimelineV2(dsAppAttemptEntityFile,
+          DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
+      // to avoid race condition of testcase, atleast check 40 times with sleep
+      // of 50ms
+      verifyEntityForTimelineV2(dsAppAttemptEntityFile,
+          DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
+
+      // Verify DS_CONTAINER entities posted by the client.
       String containerTimestampFileName =
           "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
               + "_01_000002.thist";
-      verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
-          containerTimestampFileName);
+      File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
+          "DS_CONTAINER", containerTimestampFileName);
+      // Check if required events are published and same idprefix is sent for
+      // on each publish.
+      verifyEntityForTimelineV2(dsContainerEntityFile,
+          DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
+      // to avoid race condition of testcase, atleast check 40 times with sleep
+      // of 50ms
+      verifyEntityForTimelineV2(dsContainerEntityFile,
+          DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
 
       // Verify NM posting container metrics info.
       String containerMetricsTimestampFileName =
@@ -541,29 +560,13 @@ public class TestDistributedShell {
       File containerEntityFile = verifyEntityTypeFileExists(basePath,
           TimelineEntityType.YARN_CONTAINER.toString(),
           containerMetricsTimestampFileName);
-      Assert.assertEquals(
-          "Container created event needs to be published atleast once",
-          1,
-          getNumOfStringOccurences(containerEntityFile,
-              ContainerMetricsConstants.CREATED_EVENT_TYPE));
-
-      // to avoid race condition of testcase, atleast check 4 times with sleep
-      // of 500ms
-      long numOfContainerFinishedOccurences = 0;
-      for (int i = 0; i < 4; i++) {
-        numOfContainerFinishedOccurences =
-            getNumOfStringOccurences(containerEntityFile,
-                ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-        if (numOfContainerFinishedOccurences > 0) {
-          break;
-        } else {
-          Thread.sleep(500L);
-        }
-      }
-      Assert.assertEquals(
-          "Container finished event needs to be published atleast once",
-          1,
-          numOfContainerFinishedOccurences);
+      verifyEntityForTimelineV2(containerEntityFile,
+          ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
+
+      // to avoid race condition of testcase, atleast check 40 times with sleep
+      // of 50ms
+      verifyEntityForTimelineV2(containerEntityFile,
+          ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
 
       // Verify RM posting Application life cycle Events are getting published
       String appMetricsTimestampFileName =
@@ -573,29 +576,14 @@ public class TestDistributedShell {
           verifyEntityTypeFileExists(basePath,
               TimelineEntityType.YARN_APPLICATION.toString(),
               appMetricsTimestampFileName);
-      Assert.assertEquals(
-          "Application created event should be published atleast once",
-          1,
-          getNumOfStringOccurences(appEntityFile,
-              ApplicationMetricsConstants.CREATED_EVENT_TYPE));
-
-      // to avoid race condition of testcase, atleast check 4 times with sleep
-      // of 500ms
-      long numOfStringOccurences = 0;
-      for (int i = 0; i < 4; i++) {
-        numOfStringOccurences =
-            getNumOfStringOccurences(appEntityFile,
-                ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-        if (numOfStringOccurences > 0) {
-          break;
-        } else {
-          Thread.sleep(500L);
-        }
-      }
-      Assert.assertEquals(
-          "Application finished event should be published atleast once",
-          1,
-          numOfStringOccurences);
+      // No need to check idprefix for app.
+      verifyEntityForTimelineV2(appEntityFile,
+          ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
+
+      // to avoid race condition of testcase, atleast check 40 times with sleep
+      // of 50ms
+      verifyEntityForTimelineV2(appEntityFile,
+          ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
 
       // Verify RM posting AppAttempt life cycle Events are getting published
       String appAttemptMetricsTimestampFileName =
@@ -606,17 +594,10 @@ public class TestDistributedShell {
           verifyEntityTypeFileExists(basePath,
               TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
               appAttemptMetricsTimestampFileName);
-      Assert.assertEquals(
-          "AppAttempt register event should be published atleast once",
-          1,
-          getNumOfStringOccurences(appAttemptEntityFile,
-              AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
-
-      Assert.assertEquals(
-          "AppAttempt finished event should be published atleast once",
-          1,
-          getNumOfStringOccurences(appAttemptEntityFile,
-              AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
+      verifyEntityForTimelineV2(appAttemptEntityFile,
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
+      verifyEntityForTimelineV2(appAttemptEntityFile,
+          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
     } finally {
       FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
     }
@@ -636,22 +617,64 @@ public class TestDistributedShell {
     return entityFile;
   }
 
-  private long getNumOfStringOccurences(File entityFile, String searchString)
-      throws IOException {
-    BufferedReader reader = null;
-    String strLine;
+  /**
+   * Checks the events and idprefix published for an entity.
+   *
+   * @param entityFile Entity file.
+   * @param expectedEvent Expected event Id.
+   * @param numOfExpectedEvent Number of expected occurences of expected event
+   *     id.
+   * @param checkTimes Number of times to check.
+   * @param sleepTime Sleep time for each iteration.
+   * @param checkIdPrefix Whether to check idprefix.
+   * @throws IOException if entity file reading fails.
+   * @throws InterruptedException if sleep is interrupted.
+   */
+  private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
+      long numOfExpectedEvent, int checkTimes, long sleepTime,
+      boolean checkIdPrefix) throws IOException, InterruptedException {
     long actualCount = 0;
-    try {
-      reader = new BufferedReader(new FileReader(entityFile));
-      while ((strLine = reader.readLine()) != null) {
-        if (strLine.trim().contains(searchString)) {
-          actualCount++;
+    for (int i = 0; i < checkTimes; i++) {
+      BufferedReader reader = null;
+      String strLine = null;
+      actualCount = 0;
+      try {
+        reader = new BufferedReader(new FileReader(entityFile));
+        long idPrefix = -1;
+        while ((strLine = reader.readLine()) != null) {
+          String entityLine = strLine.trim();
+          if (entityLine.isEmpty()) {
+            continue;
+          }
+          if (entityLine.contains(expectedEvent)) {
+            actualCount++;
+          }
+          if (checkIdPrefix) {
+            TimelineEntity entity = FileSystemTimelineReaderImpl.
+                getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+            Assert.assertTrue("Entity ID prefix expected to be > 0" ,
+                entity.getIdPrefix() > 0);
+            if (idPrefix == -1) {
+              idPrefix = entity.getIdPrefix();
+            } else {
+              Assert.assertEquals("Entity ID prefix should be same across " +
+                  "each publish of same entity",
+                      idPrefix, entity.getIdPrefix());
+            }
+          }
         }
+      } finally {
+        reader.close();
+      }
+      if (numOfExpectedEvent == actualCount) {
+        break;
+      }
+      if (sleepTime > 0 && i < checkTimes - 1) {
+        Thread.sleep(sleepTime);
       }
-    } finally {
-      reader.close();
     }
-    return actualCount;
+    Assert.assertEquals("Unexpected number of " +  expectedEvent +
+        " event published.", numOfExpectedEvent, actualCount);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index aaa15bf..bd33dd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -144,6 +144,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
@@ -966,10 +967,11 @@ public class ContainerManagerImpl extends CompositeService implements
     Credentials credentials =
         YarnServerSecurityUtils.parseCredentials(launchContext);
 
+    long containerStartTime = SystemClock.getInstance().getTime();
     Container container =
         new ContainerImpl(getConfig(), this.dispatcher,
             launchContext, credentials, metrics, containerTokenIdentifier,
-            context);
+            context, containerStartTime);
     ApplicationId applicationID =
         containerId.getApplicationAttemptId().getApplicationId();
     if (context.getContainers().putIfAbsent(containerId, container) != null) {
@@ -1022,7 +1024,7 @@ public class ContainerManagerImpl extends CompositeService implements
         }
 
         this.context.getNMStateStore().storeContainer(containerId,
-            containerTokenIdentifier.getVersion(), request);
+            containerTokenIdentifier.getVersion(), containerStartTime, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
index 0a8ffdf..09c946b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
@@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 
 public class ApplicationContainerFinishedEvent extends ApplicationEvent {
   private ContainerStatus containerStatus;
+  // Required by NMTimelinePublisher.
+  private long containerStartTime;
 
-  public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
+  public ApplicationContainerFinishedEvent(ContainerStatus containerStatus,
+      long containerStartTs) {
     super(containerStatus.getContainerId().getApplicationAttemptId().
         getApplicationId(),
         ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
     this.containerStatus = containerStatus;
+    this.containerStartTime = containerStartTs;
   }
 
   public ContainerId getContainerID() {
@@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent {
     return containerStatus;
   }
 
+  public long getContainerStartTime() {
+    return containerStartTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 448a14c..97c4c7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -37,6 +37,8 @@ public interface Container extends EventHandler<ContainerEvent> {
 
   ContainerId getContainerId();
 
+  long getContainerStartTime();
+
   Resource getResource();
 
   void setResource(Resource targetResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c0cc0eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index cf5a304..6f9f714 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -167,10 +167,10 @@ public class ContainerImpl implements Container {
 
   /** The NM-wide configuration - not specific to this container */
   private final Configuration daemonConf;
+  private final long startTime;
 
   private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
 
-
   // whether container has been recovered after a restart
   private RecoveredContainerStatus recoveredStatus =
       RecoveredContainerStatus.REQUESTED;
@@ -183,6 +183,16 @@ public class ContainerImpl implements Container {
       ContainerLaunchContext launchContext, Credentials creds,
       NodeManagerMetrics metrics,
       ContainerTokenIdentifier containerTokenIdentifier, Context context) {
+    this(conf, dispatcher, launchContext, creds, metrics,
+        containerTokenIdentifier, context, SystemClock.getInstance().getTime());
+  }
+
+  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+      ContainerLaunchContext launchContext, Credentials creds,
+      NodeManagerMetrics metrics,
+      ContainerTokenIdentifier containerTokenIdentifier, Context context,
+      long startTs) {
+    this.startTime = startTs;
     this.daemonConf = conf;
     this.dispatcher = dispatcher;
     this.stateStore = context.getNMStateStore();
@@ -256,7 +266,7 @@ public class ContainerImpl implements Container {
       ContainerTokenIdentifier containerTokenIdentifier, Context context,
       RecoveredContainerState rcs) {
     this(conf, dispatcher, launchContext, creds, metrics,
-        containerTokenIdentifier, context);
+        containerTokenIdentifier, context, rcs.getStartTime());
     this.recoveredStatus = rcs.getStatus();
     this.exitCode = rcs.getExitCode();
     this.recoveredAsKilled = rcs.getKilled();
@@ -628,6 +638,11 @@ public class ContainerImpl implements Container {
   }
 
   @Override
+  public long getContainerStartTime() {
+    return this.startTime;
+  }
+
+  @Override
   public Resource getResource() {
     return Resources.clone(this.resource);
   }
@@ -687,7 +702,8 @@ public class ContainerImpl implements Container {
     EventHandler eventHandler = dispatcher.getEventHandler();
 
     ContainerStatus containerStatus = cloneAndGetContainerStatus();
-    eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
+    eventHandler.handle(
+        new ApplicationContainerFinishedEvent(containerStatus, startTime));
 
     // Remove the container from the resource-monitor
     eventHandler.handle(new ContainerStopMonitoringEvent(containerId));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message