tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2646. Add scheduling casual dependency for attempts (bikas)
Date Mon, 03 Aug 2015 18:38:21 GMT
Repository: tez
Updated Branches:
  refs/heads/master f37699f1a -> 4d381d778


TEZ-2646. Add scheduling casual dependency for attempts (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4d381d77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4d381d77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4d381d77

Branch: refs/heads/master
Commit: 4d381d778f82d8d17ea9f718bbac3938ce4cc40d
Parents: f37699f
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Aug 3 11:38:15 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Aug 3 11:38:15 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  1 +
 .../dag/app/dag/event/TaskEventTAUpdate.java    | 14 ++++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 29 ++++++++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 61 +++++++++++++++-----
 .../history/events/TaskAttemptStartedEvent.java | 43 ++++++++++----
 .../impl/HistoryEventJsonConversion.java        |  4 ++
 tez-dag/src/main/proto/HistoryEvents.proto      |  2 +
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  2 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 41 +++++++++++--
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 12 ++--
 .../TestHistoryEventsProtoConversion.java       |  9 ++-
 .../impl/TestHistoryEventJsonConversion.java    |  2 +-
 .../parser/datamodel/TaskAttemptInfo.java       |  7 ++-
 .../apache/tez/history/TestATSFileParser.java   | 15 +++++
 .../ats/HistoryEventTimelineConversion.java     |  5 ++
 .../ats/TestHistoryEventTimelineConversion.java |  9 ++-
 17 files changed, 209 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fdd3ba9..303171a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2048. Remove VertexManagerPluginContext.getTaskContainer()
   TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce
merge overhead.
   TEZ-2468. Change the minimum Java version to Java 7.
+  TEZ-2646. Add scheduling casual dependency for attempts
 
 ALL CHANGES:
   TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some
inefficiency during remove() operation.

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index fd82e20..4bf9f6d 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -82,6 +82,7 @@ public class ATSConstants {
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
   public static final String EXIT_STATUS = "exitStatus";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
+  public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
index 59c7363..01eaf5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
@@ -18,19 +18,31 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
+@SuppressWarnings("rawtypes")
 public class TaskEventTAUpdate extends TaskEvent {
 
   private TezTaskAttemptID attemptID;
+  private TezAbstractEvent causalEvent;
 
   public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) {
+    this(id, type, null);
+  }
+
+  public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent)
{
     super(id.getTaskID(), type);
     this.attemptID = id;
+    this.causalEvent = causalEvent;
   }
-
+  
   public TezTaskAttemptID getTaskAttemptID() {
     return attemptID;
   }
+  
+  public TezAbstractEvent getCausalEvent() {
+    return causalEvent;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index f015155..40636dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -165,6 +165,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Resource taskResource;
   private final ContainerContext containerContext;
   private final boolean leafVertex;
+  
+  private TezTaskAttemptID schedulingCausalTA;
+  private long scheduledTime;
 
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
@@ -374,12 +377,22 @@ public class TaskAttemptImpl implements TaskAttempt,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
       Task task) {
+    this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+        taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
+        task, null);
+  }
+  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+      TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+      boolean isRescheduled,
+      Resource resource, ContainerContext containerContext, boolean leafVertex,
+      Task task, TezTaskAttemptID schedulingCausalTA) {
 
-    this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+    MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
 
-    this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+    MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -394,6 +407,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.appContext = appContext;
     this.task = task;
     this.vertex = this.task.getVertex();
+    this.schedulingCausalTA = schedulingCausalTA;
+    this.scheduledTime = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
@@ -425,6 +440,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TezDAGID getDAGID() {
     return getVertexID().getDAGId();
   }
+  
+  public TezTaskAttemptID getSchedulingCausalTA() {
+    return schedulingCausalTA;
+  }
 
   TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
     TaskSpec baseTaskSpec = task.getBaseTaskSpec();
@@ -716,6 +735,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         {
           TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
           this.launchTime = tEvent.getStartTime();
+          this.scheduledTime = tEvent.getScheduledTime();
+          this.schedulingCausalTA = tEvent.getSchedulingCausalTA();
           recoveryStartEventSeen = true;
           recoveredState = TaskAttemptState.RUNNING;
           this.containerId = tEvent.getContainerId();
@@ -936,7 +957,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getVertex().getName(),
         launchTime, containerId, containerNodeId,
-        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
+        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, scheduledTime, schedulingCausalTA);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), startEvt));
   }
@@ -1076,7 +1097,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           .getTaskAttemptState());
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
-          .getTaskEventType()));
+          .getTaskEventType(), event));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ef8e33a..e6027f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -549,7 +550,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   private TaskAttempt createRecoveredTaskAttempt(TezTaskAttemptID tezTaskAttemptID) {
-    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId());
+    TaskAttempt taskAttempt = createAttempt(tezTaskAttemptID.getId(), null);
     return taskAttempt;
   }
 
@@ -814,10 +815,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     }
   }
   
-  TaskAttemptImpl createAttempt(int attemptNumber) {
+  TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex, this);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
   }
 
   @Override
@@ -834,8 +835,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   // This is always called in the Write Lock
-  private void addAndScheduleAttempt() {
-    TaskAttempt attempt = createAttempt(attempts.size());
+  private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
+    TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
@@ -1048,7 +1049,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
       task.locationHint = scheduleEvent.getTaskLocationHint();
       task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
-      task.addAndScheduleAttempt();
+      // For now, initial scheduling dependency is due to vertex manager scheduling
+      task.addAndScheduleAttempt(null);
       task.scheduledTime = task.clock.getTime();
       task.logJobHistoryTaskStartedEvent();
     }
@@ -1066,7 +1068,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     public void transition(TaskImpl task, TaskEvent event) {
       LOG.info("Scheduling a redundant attempt for task " + task.taskId);
       task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
-      task.addAndScheduleAttempt();
+      TezTaskAttemptID earliestUnfinishedAttempt = null;
+      for (TaskAttempt ta : task.attempts.values()) {
+        // find the oldest running attempt
+        if (!ta.isFinished()) {
+          earliestUnfinishedAttempt = ta.getID();
+        }
+      }
+      task.addAndScheduleAttempt(earliestUnfinishedAttempt);
     }
   }
 
@@ -1143,9 +1152,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       // we KillWaitAttemptCompletedTransitionready have a spare
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
       task.getVertex().incrementKilledTaskAttemptCount();
-      if (task.getUncompletedAttemptsCount() == 0
-          && task.successfulAttempt == null) {
-        task.addAndScheduleAttempt();
+      if (task.shouldScheduleNewAttempt()) {
+        task.addAndScheduleAttempt(castEvent.getTaskAttemptID());
       }
     }
   }
@@ -1255,7 +1263,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
           // If any incomplete, the running attempt will moved to failed and its
           // update will trigger a new attempt if possible
           if (task.attempts.size() == task.getFinishedAttemptsCount()) {
-            task.addAndScheduleAttempt();
+            task.addAndScheduleAttempt(null);
           }
           endState = TaskStateInternal.RUNNING;
           break;
@@ -1304,15 +1312,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       return task.getInternalState();
     }
   }
+  
+  private boolean shouldScheduleNewAttempt() {
+    return (getUncompletedAttemptsCount() == 0
+            && successfulAttempt == null);
+  }
 
   private static class AttemptFailedTransition implements
     MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
+    private TezTaskAttemptID schedulingCausalTA;
+    
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       task.getVertex().incrementFailedTaskAttemptCount();
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      schedulingCausalTA = castEvent.getTaskAttemptID();
       task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed,"
           + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics());
       if (task.commitAttempt != null &&
@@ -1327,12 +1343,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
         // we don't need a new event if we already have a spare
-        if (task.getUncompletedAttemptsCount() == 0
-            && task.successfulAttempt == null) {
+        if (task.shouldScheduleNewAttempt()) {
           LOG.info("Scheduling new attempt for task: " + task.getTaskId()
               + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts:
"
               + task.maxFailedAttempts);
-          task.addAndScheduleAttempt();
+          task.addAndScheduleAttempt(getSchedulingCausalTA());
         }
       } else {
         LOG.info("Failing task: " + task.getTaskId()
@@ -1352,11 +1367,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     protected TaskStateInternal getDefaultState(TaskImpl task) {
       return task.getInternalState();
     }
+    
+    protected TezTaskAttemptID getSchedulingCausalTA() {
+      return schedulingCausalTA;
+    }
   }
 
   private static class TaskRetroactiveFailureTransition
       extends AttemptFailedTransition {
 
+    private TezTaskAttemptID schedulingCausalTA;
+
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (task.leafVertex) {
@@ -1386,6 +1407,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
         // succeeded state
         return TaskStateInternal.SUCCEEDED;
       }
+      
+      Preconditions.checkState(castEvent.getCausalEvent() != null);
+      TaskAttemptEventOutputFailed destinationEvent = 
+          (TaskAttemptEventOutputFailed) castEvent.getCausalEvent();
+      schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID();
 
       // super.transition is mostly coded for the case where an
       //  UNcompleted task failed.  When a COMPLETED task retroactively
@@ -1402,6 +1428,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
 
       return returnState;
     }
+    
+    @Override
+    protected TezTaskAttemptID getSchedulingCausalTA() {
+      return schedulingCausalTA;
+    }
 
     @Override
     protected TaskStateInternal getDefaultState(TaskImpl task) {
@@ -1433,7 +1464,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
         // from the map splitInfo. So the bad node might be sent as a location
         // to the RM. But the RM would ignore that just like it would ignore
         // currently pending container requests affinitized to bad nodes.
-        task.addAndScheduleAttempt();
+        task.addAndScheduleAttempt(attemptId);
         return TaskStateInternal.SCHEDULED;
       } else {
         // nothing to do

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 36add86..8eb074d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -36,24 +36,28 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   private String inProgressLogsUrl;
   private String completedLogsUrl;
   private String vertexName;
-  private long startTime;
+  private long launchTime;
   private ContainerId containerId;
   private NodeId nodeId;
   private String nodeHttpAddress;
+  private TezTaskAttemptID schedulingCausalTA;
+  private long scheduledTime;
 
   public TaskAttemptStartedEvent(TezTaskAttemptID taId,
-      String vertexName, long startTime,
+      String vertexName, long launchTime,
       ContainerId containerId, NodeId nodeId,
       String inProgressLogsUrl, String completedLogsUrl,
-      String nodeHttpAddress) {
+      String nodeHttpAddress, long scheduledTime, TezTaskAttemptID schedulingCausalTA) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
-    this.startTime = startTime;
+    this.launchTime = launchTime;
     this.containerId = containerId;
     this.nodeId = nodeId;
     this.inProgressLogsUrl = inProgressLogsUrl;
     this.completedLogsUrl = completedLogsUrl;
     this.nodeHttpAddress = nodeHttpAddress;
+    this.scheduledTime = scheduledTime;
+    this.schedulingCausalTA = schedulingCausalTA;
   }
 
   public TaskAttemptStartedEvent() {
@@ -75,19 +79,27 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   public TaskAttemptStartedProto toProto() {
-    return TaskAttemptStartedProto.newBuilder()
-        .setTaskAttemptId(taskAttemptId.toString())
-        .setStartTime(startTime)
+    TaskAttemptStartedProto.Builder builder = TaskAttemptStartedProto.newBuilder();
+    builder.setTaskAttemptId(taskAttemptId.toString())
+        .setStartTime(launchTime)
         .setContainerId(containerId.toString())
         .setNodeId(nodeId.toString())
-        .build();
+        .setScheduledTime(scheduledTime);
+    if (schedulingCausalTA != null) {
+      builder.setSchedulingCausalTA(schedulingCausalTA.toString());
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskAttemptStartedProto proto) {
     this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
-    this.startTime = proto.getStartTime();
+    this.launchTime = proto.getStartTime();
     this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
     this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
+    this.scheduledTime = proto.getScheduledTime();
+    if (proto.hasSchedulingCausalTA()) {
+      this.schedulingCausalTA = TezTaskAttemptID.fromString(proto.getSchedulingCausalTA());
+    }
   }
 
   @Override
@@ -108,7 +120,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   public String toString() {
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
-        + ", startTime=" + startTime
+        + ", scheduledTime=" + scheduledTime
+        + ", startTime=" + launchTime
         + ", containerId=" + containerId
         + ", nodeId=" + nodeId
         + ", inProgressLogs=" + inProgressLogsUrl
@@ -120,7 +133,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   public long getStartTime() {
-    return startTime;
+    return launchTime;
+  }
+  
+  public long getScheduledTime() {
+    return scheduledTime;
+  }
+  
+  public TezTaskAttemptID getSchedulingCausalTA() {
+    return schedulingCausalTA;
   }
 
   public ContainerId getContainerId() {

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 07ce2f3..3fdfe0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -573,6 +573,10 @@ public class HistoryEventJsonConversion {
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
     otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+    if (event.getSchedulingCausalTA() != null) {
+      otherInfo.put(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, event.getSchedulingCausalTA().toString());
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 8af48b6..402349b 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -164,6 +164,8 @@ message TaskAttemptStartedProto {
   optional int64 start_time = 2;
   optional string container_id = 3;
   optional string node_id = 4;
+  optional int64 scheduled_time = 5;
+  optional string scheduling_causal_t_a = 6;
 }
 
 message TaskAttemptFinishedProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 0665b1e..d632aa3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -155,7 +155,7 @@ public class TestTaskAttemptRecovery {
   private void restoreFromTAStartEvent() {
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(TaskAttemptState.RUNNING, recoveredState);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 9e5d395..807f277 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -55,6 +55,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -71,6 +72,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -358,7 +360,10 @@ public class TestTaskImpl {
     LOG.info("--- START: testKillScheduledTaskAttempt ---");
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
+    // last killed attempt should be causal TA of next attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
   }
 
   @Test(timeout = 5000)
@@ -382,8 +387,11 @@ public class TestTaskImpl {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+    // last killed attempt should be causal TA of next attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
   }
 
   /**
@@ -504,11 +512,15 @@ public class TestTaskImpl {
 
     // During the task attempt commit there is an exception which causes
     // the attempt to fail
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
+    assertEquals(1, mockTask.getAttemptList().size());
     failRunningTaskAttempt(mockTask.getLastAttempt().getID());
 
     assertEquals(2, mockTask.getAttemptList().size());
     assertEquals(1, mockTask.failedAttempts);
+    // last failed attempt should be the causal TA
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
 
     assertFalse("First attempt should not commit",
         mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
@@ -552,6 +564,7 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     
     // Add a speculative task attempt that succeeds
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
@@ -559,6 +572,11 @@ public class TestTaskImpl {
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
     
+    assertEquals(2, mockTask.getAttemptList().size());
+    
+    // previous running attempt should be the casual TA of this speculative attempt
+    Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
+    
     assertTrue("Second attempt should commit",
         mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
     assertFalse("First attempt should not commit",
@@ -601,8 +619,14 @@ public class TestTaskImpl {
 
     eventHandler.events.clear();
     // Now fail the attempt after it has succeeded
+    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent = mock(TezEvent.class);
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge",
mockDestId);
+    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+    TaskAttemptEventOutputFailed outputFailedEvent = 
+        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
-        .getID(), TaskEventType.T_ATTEMPT_FAILED));
+        .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();
@@ -610,6 +634,12 @@ public class TestTaskImpl {
     Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
     event = eventHandler.events.get(eventHandler.events.size()-1);
     Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+    
+    // report of output read error should be the causal TA
+    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+    Assert.assertEquals(2, attempts.size());
+    MockTaskAttemptImpl newAttempt = attempts.get(1);
+    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
   @Test(timeout = 5000)
@@ -679,11 +709,11 @@ public class TestTaskImpl {
     }
 
     @Override
-    protected TaskAttemptImpl createAttempt(int attemptNumber) {
+    protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA)
{
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
           attemptNumber, eventHandler, taskAttemptListener,
           conf, clock, taskHeartbeatHandler, appContext,
-          true, taskResource, containerContext);
+          true, taskResource, containerContext, schedCausalTA);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -730,9 +760,10 @@ public class TestTaskImpl {
         EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
         boolean isRescheduled,
-        Resource resource, ContainerContext containerContext) {
+        Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA)
{
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
+          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
+          schedCausalTA);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index f43f52c..feb290f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -215,7 +215,7 @@ public class TestTaskRecovery {
     long taStartTime = taskStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(0, task.getFinishedAttemptsCount());
     assertEquals(taskScheduledTime, task.scheduledTime);
@@ -721,7 +721,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -774,7 +774,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.KILLED, null, "", null));
     }
@@ -804,7 +804,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, null, "", null));
     }
@@ -834,7 +834,7 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts - 1; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
           0, TaskAttemptState.FAILED, null, "", null));
     }
@@ -844,7 +844,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
-            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
 
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b52a4f9..9be3531 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -478,7 +478,10 @@ public class TestHistoryEventsProtoConversion {
         "vertex1", 10009l, ContainerId.newInstance(
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
-        "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
+        "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024,
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0)
+        );
     TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getTaskAttemptID(),
@@ -489,6 +492,10 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getNodeId());
     Assert.assertEquals(event.getStartTime(),
         deserializedEvent.getStartTime());
+    Assert.assertEquals(event.getScheduledTime(),
+        deserializedEvent.getScheduledTime());
+    Assert.assertEquals(event.getSchedulingCausalTA(),
+        deserializedEvent.getSchedulingCausalTA());
     logEvents(event, deserializedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 3ab204a..db871a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -160,7 +160,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress");
+              nodeId, null, null, "nodeHttpAddress", 0, null);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index 4c3fa97..cca984a 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -45,7 +45,7 @@ public class TaskAttemptInfo extends BaseInfo {
   private final String nodeId;
   private final String status;
   private final String logUrl;
-
+  private final String schedulingCausalTA;
   private TaskInfo taskInfo;
 
   private Container container;
@@ -66,6 +66,7 @@ public class TaskAttemptInfo extends BaseInfo {
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
     successfulAttemptId = otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID);
     scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
+    schedulingCausalTA = otherInfoNode.optString(Constants.SCHEDULING_CAUSAL_ATTEMPT);
 
     containerId = otherInfoNode.optString(Constants.CONTAINER_ID);
     String id = otherInfoNode.optString(Constants.NODE_ID);
@@ -111,6 +112,10 @@ public class TaskAttemptInfo extends BaseInfo {
   public final long getScheduledTime() {
     return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime());
   }
+  
+  public final String getSchedulingCausalTA() {
+    return schedulingCausalTA;
+  }
 
   @Override
   public final String getDiagnostics() {

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
index faff182..1d59e98 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
@@ -240,6 +240,10 @@ public class TestATSFileParser {
         assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
         assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
         assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+          assertTrue(attemptInfo.getStartTime() > 0);
+          assertTrue(attemptInfo.getScheduledTime() > 0);
+        }
       }
       assertTrue(vertexInfo.getLastTaskToFinish() != null);
       if (vertexInfo.getVertexName().equals(TOKENIZER)) {
@@ -326,6 +330,17 @@ public class TestATSFileParser {
         20); //Every line has 2 words. 10 lines x 2 words = 20
     verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
         "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+    
+    for (TaskInfo taskInfo : summationVertex.getTasks()) {
+      String lastAttemptId = null;
+      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+        if (lastAttemptId != null) {
+          // failed attempt should be causal TA of next attempt
+          assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+        }
+        lastAttemptId = attemptInfo.getTaskAttemptId();
+      }
+    }
 
     //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters
are not getting populated.
     //TaskCounter.REDUCE_INPUT_RECORDS

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 77f4dd1..95f77e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -467,6 +467,11 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
     atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
     atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
+    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+    if (event.getSchedulingCausalTA() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT,
+          event.getSchedulingCausalTA().toString());
+    }
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4d381d77/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index e324d1b..bf8d0ec 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -165,7 +165,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress");
+              nodeId, null, null, "nodeHttpAddress", 0, null);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
@@ -724,8 +724,10 @@ public class TestHistoryEventTimelineConversion {
   @Test(timeout = 5000)
   public void testConvertTaskAttemptStartedEvent() {
     long startTime = random.nextLong();
+    long scheduleTime = 1024;
     TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
-        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
+        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", 
+        scheduleTime, tezTaskAttemptID);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -770,6 +772,9 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
     Assert.assertTrue(TaskAttemptState.RUNNING.name()
         .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
+    Assert.assertEquals(tezTaskAttemptID.toString(), 
+        timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT));
+    Assert.assertEquals(scheduleTime, timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME));
   }
 
   @Test(timeout = 5000)


Mime
View raw message