tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [10/27] tez git commit: TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS)
Date Wed, 10 Dec 2014 03:30:20 GMT
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/81eef37d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/81eef37d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/81eef37d

Branch: refs/heads/TEZ-8
Commit: 81eef37d9e1e9222ef09eed319c45cdcd9034cd8
Parents: e8294b8
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Nov 24 11:15:44 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Nov 24 11:15:44 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tez/common/ATSConstants.java     |   1 +
 .../records/TaskAttemptTerminationCause.java    |  45 ++++
 .../tez/dag/app/TaskHeartbeatHandler.java       |   3 +-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   2 +
 .../event/TaskAttemptEventAttemptFailed.java    |  13 +-
 .../TaskAttemptEventContainerTerminated.java    |  13 +-
 ...AttemptEventContainerTerminatedBySystem.java |  13 +-
 .../TaskAttemptEventContainerTerminating.java   |  12 +-
 .../dag/event/TaskAttemptEventKillRequest.java  |  13 +-
 .../dag/event/TaskAttemptEventNodeFailed.java   |  12 +-
 .../dag/event/TaskAttemptEventOutputFailed.java |   9 +-
 .../TaskAttemptEventTerminationCauseEvent.java  |  26 +++
 .../dag/app/dag/event/TaskAttemptEventType.java |   3 +-
 .../dag/app/dag/event/TaskEventTermination.java |  28 ++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  39 +++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  19 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  36 ++-
 .../app/launcher/LocalContainerLauncher.java    |   9 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  14 +-
 .../rm/container/AMContainerEventCompleted.java |  13 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  96 ++++----
 .../events/TaskAttemptFinishedEvent.java        |  20 +-
 .../impl/HistoryEventJsonConversion.java        |   5 +-
 tez-dag/src/main/proto/HistoryEvents.proto      |   1 +
 .../org/apache/tez/dag/app/TestPreemption.java  |   2 +
 .../org/apache/tez/dag/app/TestSpeculation.java |   3 +
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  51 ++--
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  13 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  12 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |  50 ++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 133 +++++++++--
 .../app/rm/TestTaskSchedulerEventHandler.java   |  30 +++
 .../dag/app/rm/container/TestAMContainer.java   | 230 ++++++++++++++-----
 .../TestHistoryEventsProtoConversion.java       |   8 +-
 .../impl/TestHistoryEventJsonConversion.java    |   3 +-
 .../ats/HistoryEventTimelineConversion.java     |   3 +
 .../ats/TestHistoryEventTimelineConversion.java |   3 +-
 38 files changed, 762 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a393a2..011bbaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
+  history record
   TEZ-14. Support MR like speculation capabilities based on latency deviation
   from the mean
   TEZ-1733. TezMerger should sort FileChunks on size when merging

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 7b47b9c..e502374 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -62,6 +62,7 @@ public class ATSConstants {
   public static final String FINISH_TIME = "endTime";
   public static final String TIME_TAKEN = "timeTaken";
   public static final String STATUS = "status";
+  public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
   public static final String DIAGNOSTICS = "diagnostics";
   public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
   public static final String COUNTERS = "counters";

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
new file mode 100644
index 0000000..ef0bb33
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public enum TaskAttemptTerminationCause {
+  UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error propagation
+  
+  TERMINATED_BY_CLIENT, // Killed by client command
+  TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+  INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
+  EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
+  TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded
+  TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because speculation succeeded
+  TERMINATED_ORPHANED, // Attempt is no longer needed by the task
+  
+  APPLICATION_ERROR, // Failed due to application code error
+  FRAMEWORK_ERROR, // Failed due to code error in Tez code
+  INPUT_READ_ERROR, // Failed due to error in reading inputs
+  OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
+  OUTPUT_LOST, // Failed because attempts output were reported lost
+  TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
+  
+  CONTAINER_LAUNCH_FAILED, // Failed to launch container
+  CONTAINER_EXITED, // Container exited. Indicates gap in specific error propagation from the cluster
+  CONTAINER_STOPPED, // Container stopped or released by Tez
+  NODE_FAILED, // Node for the container failed
+  NODE_DISK_ERROR, // Disk failed on the node runnign the task
+  
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 6b698aa..d115b14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 
@@ -60,6 +61,6 @@ public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID>
   protected void handleTimeOut(TezTaskAttemptID attemptId) {
     eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
         TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
-        + " Timed out after " + timeOut / 1000 + " secs"));
+        + " Timed out after " + timeOut / 1000 + " secs", TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 4aa220d..3f60a4e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -73,6 +74,7 @@ public interface TaskAttempt {
   
   TaskAttemptReport getReport();
   List<String> getDiagnostics();
+  TaskAttemptTerminationCause getTerminationCause();
   TezCounters getCounters();
   float getProgress();
   TaskAttemptState getState();

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 5c7b956..b9c1d09 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,16 +18,19 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent 
-  implements DiagnosableEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String diagnostics;
+  private final TaskAttemptTerminationCause errorCause;
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
-      TaskAttemptEventType type, String diagnostics) {
+      TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
     super(id, type);
     this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
   }
 
   @Override
@@ -35,5 +38,9 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
     return diagnostics;
   }
   
-  
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 87aa313..5dd0141 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,20 +17,29 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
-    implements DiagnosableEvent {
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
-  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message) {
+  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message, 
+      TaskAttemptTerminationCause errCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
     this.message = message;
+    this.errorCause = errCause;
   }
 
   @Override
   public String getDiagnosticInfo() {
     return message;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a92aafd..a3c57e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,19 +18,28 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent 
-  implements DiagnosableEvent {
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String diagnostics;
-  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics) {
+  private final TaskAttemptTerminationCause errorCause;
+  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics,
+      TaskAttemptTerminationCause errorCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
     this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
   }
 
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
index 7da6e14..02d04a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
@@ -17,17 +17,20 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
-    implements DiagnosableEvent {
+    implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
   public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
-      String diagMessage) {
+      String diagMessage, TaskAttemptTerminationCause errCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
     this.message = diagMessage;
+    this.errorCause = errCause;
   }
 
   @Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
     return this.message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 0205fcf..a0dfe5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -17,15 +17,19 @@
 */
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements DiagnosableEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent 
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
-  public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message) {
+  public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, TaskAttemptTerminationCause err) {
     super(id, TaskAttemptEventType.TA_KILL_REQUEST);
     this.message = message;
+    this.errorCause = err;
   }
 
   @Override
@@ -33,4 +37,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements Dia
     return message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index 6d97466..541ef00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -17,17 +17,20 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventNodeFailed extends TaskAttemptEvent 
-  implements DiagnosableEvent{
+  implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
   private final String message;
+  private final TaskAttemptTerminationCause errorCause;
 
   public TaskAttemptEventNodeFailed(TezTaskAttemptID id,
-      String diagMessage) {
+      String diagMessage, TaskAttemptTerminationCause errorCause) {
     super(id, TaskAttemptEventType.TA_NODE_FAILED);
     this.message = diagMessage;
+    this.errorCause = errorCause;
   }
 
   @Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
     return this.message;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
index 678e1e7..6bc110a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -18,10 +18,12 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
-public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent 
+  implements TaskAttemptEventTerminationCauseEvent {
   
   private TezEvent inputFailedEvent;
   private int consumerTaskNumber;
@@ -40,5 +42,10 @@ public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
   public int getConsumerTaskNumber() {
     return consumerTaskNumber;
   }
+
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return TaskAttemptTerminationCause.OUTPUT_LOST;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
new file mode 100644
index 0000000..70c20e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+public interface TaskAttemptEventTerminationCauseEvent {
+
+  public TaskAttemptTerminationCause getTerminationCause();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index c8eec1b..b7aca36 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,8 +29,7 @@ public enum TaskAttemptEventType {
 //Producer: TaskAttemptListener
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
-  TA_DIAGNOSTICS_UPDATE,
-  TA_COMMIT_PENDING,
+  TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
   TA_DONE,
   TA_FAILED,
   TA_TIMED_OUT,

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index 73d5744..d48a0bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -18,22 +18,23 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskID;
 
-public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
+public class TaskEventTermination extends TaskEvent implements DiagnosableEvent,
+    TaskAttemptEventTerminationCauseEvent {
 
-  private TaskTerminationCause terminationCause;
-  private String diagnostics;
+  private final String diagnostics;
+  private final TaskAttemptTerminationCause errorCause;
   
-  public TaskEventTermination(TezTaskID taskID, TaskTerminationCause terminationCause) {
+  public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause errorCause, String diagnostics) {
     super(taskID, TaskEventType.T_TERMINATE);
-    this.terminationCause = terminationCause;
-    this.diagnostics = "Task is terminated due to:" + terminationCause.name();
-  }
-
-  public TaskTerminationCause getTerminationCause() {
-    return terminationCause;
+    this.errorCause = errorCause;
+    if (diagnostics != null) {
+      this.diagnostics = diagnostics;
+    } else {
+      this.diagnostics = "Task is terminated due to: " + errorCause.name();
+    }
   }
 
   @Override
@@ -41,4 +42,9 @@ public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
     return diagnostics;
   }
 
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errorCause;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3056c1e..5103095 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -91,6 +92,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -124,6 +126,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected EventHandler eventHandler;
   private final TezTaskAttemptID attemptId;
   private final Clock clock;
+  private TaskAttemptTerminationCause terminationCause = TaskAttemptTerminationCause.UNKNOWN_ERROR;
   private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
@@ -291,7 +294,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -313,7 +315,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -332,7 +333,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_SCHEDULE,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -352,7 +352,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_SCHEDULE,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
               TaskAttemptEventType.TA_KILL_REQUEST,
@@ -498,6 +497,11 @@ public class TaskAttemptImpl implements TaskAttempt,
       readLock.unlock();
     }
   }
+  
+  @Override
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return terminationCause;
+  }
 
   @Override
   public TezCounters getCounters() {
@@ -745,6 +749,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         this.reportedStatus.counters = tEvent.getCounters();
         this.reportedStatus.progress = 1f;
         this.reportedStatus.state = tEvent.getState();
+        this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
+            : TaskAttemptTerminationCause.UNKNOWN_ERROR;
         this.diagnostics.add(tEvent.getDiagnostics());
         this.recoveredState = tEvent.getState();
         sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
@@ -959,8 +965,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(), getLaunchTime(),
-        getFinishTime(), TaskAttemptState.SUCCEEDED, "",
-        getCounters());
+        getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+        "", getCounters());
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -971,9 +977,9 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(), getLaunchTime(),
         clock.getTime(), state,
+        terminationCause,
         StringUtils.join(
-            getDiagnostics(), LINE_SEPARATOR),
-        getCounters());
+            getDiagnostics(), LINE_SEPARATOR), getCounters());
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1003,7 +1009,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         LOG.error(msg, e);
         String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
         new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
-            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag));
+            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
+                TaskAttemptTerminationCause.APPLICATION_ERROR));
         return TaskAttemptStateInternal.FAILED;
       }
       // Create startTaskRequest
@@ -1085,6 +1092,13 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (event instanceof DiagnosableEvent) {
         ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
       }
+      
+      // this should catch at test time if any new events are missing the error cause
+      assert event instanceof TaskAttemptEventTerminationCauseEvent;
+      
+      if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+        ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
+      }
 
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
@@ -1484,6 +1498,13 @@ public class TaskAttemptImpl implements TaskAttempt,
       sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
     }
   }
+  
+  private void trySetTerminationCause(TaskAttemptTerminationCause err) {
+    // keep only the first error cause
+    if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
+      terminationCause = err;
+    }
+  }
 
   private void initTaskAttemptStatus(TaskAttemptStatus result) {
     result.progress = 0.0f;

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index c3ba11d..b20fa13 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -91,6 +93,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 
@@ -698,7 +701,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (getState() != TaskState.RUNNING) {
         LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
         eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
-            , "Task not running. Bad attempt."));
+            , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED));
         return false;
       }
       if (commitAttempt == null) {
@@ -1027,15 +1030,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
             task.successfulAttempt + " has succeeded");
           String diagnostics = null;
+          TaskAttemptTerminationCause errCause = null;
           if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
             diagnostics = "Killed this attempt as other speculative attempt : " + successTaId
                 + " succeeded";
+            errCause = TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION;
           } else {
             diagnostics = "Killed this speculative attempt as original attempt: " + successTaId
                 + " succeeded";
+            errCause = TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION;
           }
           task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
-              .getID(), diagnostics));
+              .getID(), diagnostics, errCause));
         }
       }
       // send notification to DAG scheduler
@@ -1416,14 +1422,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+  private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
     if (commitAttempt != null && commitAttempt.equals(attempt)) {
       LOG.info("Removing commit attempt: " + commitAttempt);
       commitAttempt = null;
     }
     if (attempt != null && !attempt.isFinished()) {
-      eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
-          logMsg));
+      eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(), logMsg, errorCause));
     }
   }
 
@@ -1445,8 +1450,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
       // issue kill to all non finished attempts
       for (TaskAttempt attempt : task.attempts.values()) {
-        task.killUnfinishedAttempt
-            (attempt, "Task KILL is received. Killing attempt!");
+        task.killUnfinishedAttempt(attempt, "Task KILL is received. Killing attempt. Diagnostics: "
+            + terminateEvent.getDiagnosticInfo(), terminateEvent.getTerminationCause());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 54cd6c4..3246f38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -139,6 +139,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -1821,12 +1822,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
    */
   void tryEnactKill(VertexTerminationCause trigger,
       TaskTerminationCause taskterminationCause) {
+    // In most cases the dag is shutting down due to some error
+    TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
+    if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
+      errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
+    }
     if(trySetTerminationCause(trigger)){
-      LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger: "
-          + trigger);
+      String msg = "Killing tasks in vertex: " + logIdentifier + " due to trigger: " + trigger; 
+      LOG.info(msg);
       for (Task task : tasks.values()) {
-        eventHandler.handle(
-            new TaskEventTermination(task.getTaskId(), taskterminationCause));
+        eventHandler.handle( // attempt was terminated because the vertex is shutting down
+            new TaskEventTermination(task.getTaskId(), errCause, msg));
       }
     }
   }
@@ -3912,12 +3918,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       case TASK_ATTEMPT_FAILED_EVENT:
         {
           checkEventSourceMetadata(vertex, sourceMeta);
+          TaskAttemptTerminationCause errCause = null;
+          switch (sourceMeta.getEventGenerator()) {
+          case INPUT:
+            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+            break;
+          case PROCESSOR:
+            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+            break;
+          case OUTPUT:
+            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+            break;
+          case SYSTEM:
+            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+            break;
+          default:
+            throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
+                sourceMeta.getEventGenerator());
+          }
           TaskAttemptFailedEvent taskFailedEvent =
               (TaskAttemptFailedEvent) tezEvent.getEvent();
           vertex.getEventHandler().handle(
               new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                   TaskAttemptEventType.TA_FAILED,
-                  "Error: " + taskFailedEvent.getDiagnostics()));
+                  "Error: " + taskFailedEvent.getDiagnostics(), 
+                  errCause)
+              );
         }
         break;
       default:

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index baeb9a3..f14fd5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.runtime.task.TezChild;
 
 
@@ -242,14 +243,14 @@ public class LocalContainerLauncher extends AbstractService implements
         LOG.info("Container: " + containerId + " completed successfully");
         appContext.getEventHandler().handle(
             new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
-                null));
+                null, TaskAttemptTerminationCause.CONTAINER_EXITED));
       } else {
         LOG.info("Container: " + containerId + " completed but with errors");
         appContext.getEventHandler().handle(
             new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
                 result.getErrorMessage() == null ?
                     (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
-                    result.getErrorMessage()));
+                    result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
       }
     }
 
@@ -263,13 +264,13 @@ public class LocalContainerLauncher extends AbstractService implements
         appContext.getEventHandler()
             .handle(new AMContainerEventCompleted(containerId,
                 TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
-                t.getMessage()));
+                t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
       } else {
         LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
         appContext.getEventHandler()
             .handle(new AMContainerEventCompleted(containerId,
                 TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
-                "CancellationException"));
+                "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index ec8e73f..625b09e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 import com.google.common.base.Preconditions;
 
@@ -422,19 +423,22 @@ public class TaskSchedulerEventHandler extends AbstractService
     // Inform the Containers about completion.
     AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
     if (amContainer != null) {
-      String message = null;
+      String message = "Container completed. ";
+      TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
       int exitStatus = containerStatus.getExitStatus();
       if (exitStatus == ContainerExitStatus.PREEMPTED) {
         message = "Container preempted externally. ";
+        errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
       } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
         message = "Container disk failed. ";
-      } else {
+        errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+      } else if (exitStatus != ContainerExitStatus.SUCCESS){
         message = "Container failed. ";
       }
       if (containerStatus.getDiagnostics() != null) {
         message += containerStatus.getDiagnostics();
       }
-      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
+      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
     }
   }
 
@@ -550,8 +554,8 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void preemptContainer(ContainerId containerId) {
     taskScheduler.deallocateContainer(containerId);
     // Inform the Containers about completion.
-    sendEvent(new AMContainerEventCompleted(containerId,
-        ContainerExitStatus.PREEMPTED, "Container preempted internally"));
+    sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
+        "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
   }
 
   public void setShouldUnregisterFlag() {

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index e9649f3..a455f1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,17 +20,20 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 public class AMContainerEventCompleted extends AMContainerEvent {
 
   private final int exitStatus;
   private final String diagnostics;
+  private final TaskAttemptTerminationCause errCause;
 
   public AMContainerEventCompleted(ContainerId containerId, 
-      int exitStatus, String diagnostics) {
+      int exitStatus, String diagnostics, TaskAttemptTerminationCause errCause) {
     super(containerId, AMContainerEventType.C_COMPLETED);
     this.exitStatus = exitStatus;
     this.diagnostics = diagnostics;
+    this.errCause = errCause;
   }
 
   public boolean isPreempted() {
@@ -41,6 +44,10 @@ public class AMContainerEventCompleted extends AMContainerEvent {
     return (exitStatus == ContainerExitStatus.DISKS_FAILED);
   }
   
+  public boolean isClusterAction() {
+    return isPreempted() || isDiskFailed();
+  }
+  
   public String getDiagnostics() {
     return diagnostics;
   }
@@ -48,5 +55,9 @@ public class AMContainerEventCompleted extends AMContainerEvent {
   public int getContainerExitStatus() {
     return exitStatus;
   }
+  
+  public TaskAttemptTerminationCause getTerminationCause() {
+    return errCause;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index a0f9cb7..9d4f46b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -533,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
           .getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
           "AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
-              container.getContainerId());
+              container.getContainerId(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
           "  for ContainerId: " + container.getContainerId() +
@@ -644,8 +645,10 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+        // for a properly setup cluster this should almost always be an app error
+        // need to differentiate between launch failed due to framework/cluster or app
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
-            event.getMessage());
+            event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
       }
       container.unregisterFromTAListener();
       container.deAllocate();
@@ -659,12 +662,17 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        if (event.isPreempted() || event.isDiskFailed()) {
+        if (event.isClusterAction()) {
           container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
-              errorMessage);
+              errorMessage, event.getTerminationCause());
         } else {
-          container.sendTerminatedToTaskAttempt(container.pendingAttempt,
-              errorMessage);
+          container
+              .sendTerminatedToTaskAttempt(
+                  container.pendingAttempt,
+                  errorMessage,
+                  // if termination cause is generic exited then replace with specific
+                  (event.getTerminationCause() == TaskAttemptTerminationCause.CONTAINER_EXITED ? 
+                      TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED : event.getTerminationCause()));
         }
         container.registerFailedAttempt(container.pendingAttempt);
         container.pendingAttempt = null;
@@ -696,7 +704,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
-            getMessage(container, cEvent));
+            getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
       }
       container.unregisterFromTAListener();
       container.logStopped(container.pendingAttempt == null ? 
@@ -722,27 +730,31 @@ public class AMContainerImpl implements AMContainer {
         return;
       }
       container.nodeFailed = true;
-      String errorMessage = null;
+      String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
       if (cEvent instanceof DiagnosableEvent) {
-        errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+        errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
       }
 
       for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendNodeFailureToTA(taId, errorMessage);
+        container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
       }
       for (TezTaskAttemptID taId : container.completedAttempts) {
-        container.sendNodeFailureToTA(taId, errorMessage);
+        container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
       }
 
       if (container.pendingAttempt != null) {
         // Will be null in COMPLETED state.
-        container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
-        container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node failure");
+        container.sendNodeFailureToTA(container.pendingAttempt, errorMessage, 
+            TaskAttemptTerminationCause.NODE_FAILED);
+        container.sendTerminatingToTaskAttempt(container.pendingAttempt, errorMessage,
+            TaskAttemptTerminationCause.NODE_FAILED);
       }
       if (container.runningAttempt != null) {
         // Will be null in COMPLETED state.
-        container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
-        container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
+        container.sendNodeFailureToTA(container.runningAttempt, errorMessage, 
+            TaskAttemptTerminationCause.NODE_FAILED);
+        container.sendTerminatingToTaskAttempt(container.runningAttempt, errorMessage,
+            TaskAttemptTerminationCause.NODE_FAILED);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
     }
@@ -767,7 +779,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.pendingAttempt,
             "Container " + container.getContainerId() +
                 " hit an invalid transition - " + cEvent.getType() + " at " +
-                container.getState());
+                container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
       container.sendStopRequestToNM();
@@ -909,12 +921,12 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      if (event.isPreempted() || event.isDiskFailed()) {
+      if (event.isClusterAction()) {
         container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
-            getMessage(container, event));
+            getMessage(container, event), event.getTerminationCause());
       } else {
         container.sendTerminatedToTaskAttempt(container.runningAttempt,
-            getMessage(container, event));
+            getMessage(container, event), event.getTerminationCause());
       }
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.registerFailedAttempt(container.runningAttempt);
@@ -929,8 +941,8 @@ public class AMContainerImpl implements AMContainer {
 
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.sendTerminatingToTaskAttempt(container.runningAttempt,
-          " Container" + container.getContainerId() +
-              " received a STOP_REQUEST");
+          " Container" + container.getContainerId() + " received a STOP_REQUEST",
+          TaskAttemptTerminationCause.CONTAINER_STOPPED);
       super.transition(container, cEvent);
     }
   }
@@ -964,7 +976,7 @@ public class AMContainerImpl implements AMContainer {
       container.sendTerminatingToTaskAttempt(container.runningAttempt,
           "Container " + container.getContainerId() +
               " hit an invalid transition - " + cEvent.getType() + " at " +
-              container.getState());
+              container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
     }
   }
 
@@ -978,7 +990,8 @@ public class AMContainerImpl implements AMContainer {
           " cannot be allocated to container: " + container.getContainerId() +
           " in " + container.getState() + " state";
       container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-      container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+      container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+          TaskAttemptTerminationCause.CONTAINER_EXITED);
       container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
@@ -1001,15 +1014,18 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       String diag = event.getDiagnostics();
       for (TezTaskAttemptID taId : container.failedAssignments) {
-        container.sendTerminatedToTaskAttempt(taId, diag);
+        container.sendTerminatedToTaskAttempt(taId, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
       }
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
         container.registerFailedAttempt(container.pendingAttempt);
         container.pendingAttempt = null;
       }
       if (container.runningAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, diag, 
+            TaskAttemptTerminationCause.CONTAINER_EXITED);
         container.registerFailedAttempt(container.runningAttempt);
         container.runningAttempt = null;
       }
@@ -1078,12 +1094,11 @@ public class AMContainerImpl implements AMContainer {
           + " in COMPLETED state";
       container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
       container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
-          errorMessage);
+          errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       container.registerFailedAttempt(event.getTaskAttemptId());
     }
   }
 
-
   private void handleExtraTAAssign(
       AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
     this.inError = true;
@@ -1092,8 +1107,10 @@ public class AMContainerImpl implements AMContainer {
         ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
         ". Current state: " + this.getState();
     this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
-    this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
-    this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+    this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+        TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+    this.sendTerminatingToTaskAttempt(currentTaId, errorMessage,
+        TaskAttemptTerminationCause.FRAMEWORK_ERROR);
     this.registerFailedAttempt(event.getTaskAttemptId());
     LOG.warn(errorMessage);
     this.logStopped(ContainerExitStatus.INVALID);
@@ -1122,28 +1139,29 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendTerminatedToTaskAttempt(
-      TezTaskAttemptID taId, String message) {
-    sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+      TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) {
+    sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause));
   }
   
   protected void sendContainerTerminatedBySystemToTaskAttempt(
-    TezTaskAttemptID taId, String message) {
-      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message));
+    TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) {
+      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause));
   }
 
   protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
-      String message) {
-    sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+      String message, TaskAttemptTerminationCause errorCause) {
+    sendEvent(new TaskAttemptEventContainerTerminating(taId, message, errorCause));
   }
 
   protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
     if (this.nodeFailed) {
-      this.sendNodeFailureToTA(taId, "Node Failed");
+      this.sendNodeFailureToTA(taId, "Node Failed", TaskAttemptTerminationCause.NODE_FAILED);
     }
   }
 
-  protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
-    sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+  protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message, 
+      TaskAttemptTerminationCause errorCause) {
+    sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause));
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 0ae8061..2b21c89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
 
@@ -43,21 +44,23 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private TaskAttemptState state;
   private String diagnostics;
   private TezCounters tezCounters;
+  private TaskAttemptTerminationCause error;
 
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long startTime,
       long finishTime,
       TaskAttemptState state,
-      String diagnostics,
-      TezCounters counters) {
+      TaskAttemptTerminationCause error,
+      String diagnostics, TezCounters counters) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.startTime = startTime;
     this.finishTime = finishTime;
     this.state = state;
     this.diagnostics = diagnostics;
-    tezCounters = counters;
+    this.tezCounters = counters;
+    this.error = error;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -87,6 +90,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     if (diagnostics != null) {
       builder.setDiagnostics(diagnostics);
     }
+    if (error != null) {
+      builder.setErrorEnum(error.name());
+    }
     if (tezCounters != null) {
       builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
     }
@@ -100,6 +106,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     if (proto.hasDiagnostics()) {
       this.diagnostics = proto.getDiagnostics();
     }
+    if (proto.hasErrorEnum()) {
+      this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
+    }
     if (proto.hasCounters()) {
       this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
@@ -129,6 +138,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", finishTime=" + finishTime
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
+        + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
         + ", counters=" + (tezCounters == null ? "null" :
           tezCounters.toString()
@@ -146,6 +156,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   public String getDiagnostics() {
     return diagnostics;
   }
+  
+  public TaskAttemptTerminationCause getTaskAttemptError() {
+    return error;
+  }
 
   public long getFinishTime() {
     return finishTime;

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8560359..79a0c34 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,11 +18,9 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.tez.common.ATSConstants;
@@ -485,6 +483,9 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
     otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskAttemptError() != null) {
+      otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+    }
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e8f323d..45e9582 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -166,6 +166,7 @@ message TaskAttemptFinishedProto {
   optional int32 state = 3;
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
+  optional string error_enum = 6;
 }
 
 message EventMetaDataProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index bc15954..8cc2e8b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -200,6 +201,7 @@ public class TestPreemption {
       TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);      
       TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
       Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+      Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, taImpl.getTerminationCause());
     }
     
     System.out.println("TestPreemption - Done running - " + info);

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 114c44b..38eb934 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -115,6 +116,7 @@ public class TestSpeculation {
     Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
     TaskAttempt killedAttempt = task.getAttempt(killedTaId);
     Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
+    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
     tezClient.stop();
   }
   
@@ -154,6 +156,7 @@ public class TestSpeculation {
     Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
     TaskAttempt killedAttempt = task.getAttempt(killedTaId);
     Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
+    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
     tezClient.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6796d02..29469b1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -291,9 +292,11 @@ public class TestTaskAttempt {
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
     // At state STARTING.
-    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+        TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
     // At some KILLING state.
-    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+        TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
     // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
     // null));
     assertFalse(eventHandler.internalError);
@@ -358,7 +361,7 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
-        "Terminating"));
+        "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
         eventHandler.internalError);
@@ -368,6 +371,7 @@ public class TestTaskAttempt {
 
     assertEquals(1, taImpl.getDiagnostics().size());
     assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
 
     int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
     arg = ArgumentCaptor.forClass(Event.class);
@@ -384,13 +388,16 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
-        "Terminated"));
+        "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
 
     assertEquals(2, taImpl.getDiagnostics().size());
     assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+    
+    // check that original error cause is retained
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
   }
 
 
@@ -445,13 +452,14 @@ public class TestTaskAttempt {
         null));
     assertEquals("Task attempt is not in running state", taImpl.getState(),
         TaskAttemptState.RUNNING);
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated"));
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated",
+        TaskAttemptTerminationCause.CONTAINER_EXITED));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
         eventHandler.internalError);
 
     assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+    assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause());
     // TODO Ensure TA_TERMINATING after this is ingored.
   }
 
@@ -535,17 +543,18 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
-        "Terminated"));
+        "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
 
     // Verify that the diagnostic message included in the Terminated event is not
-    // captured - TA already succeeded.
+    // captured - TA already succeeded. Error cause is the default value.
     assertEquals(0, taImpl.getDiagnostics().size());
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
   }
   
-  @Test//(timeout = 5000)
+  @Test(timeout = 5000)
   public void testFailure() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
@@ -607,18 +616,23 @@ public class TestTaskAttempt {
     
     taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
     
-    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0"));
+    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
+        TaskAttemptTerminationCause.APPLICATION_ERROR));
 
     assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
         TaskAttemptStateInternal.FAIL_IN_PROGRESS);
 
     assertEquals(1, taImpl.getDiagnostics().size());
     assertEquals("0", taImpl.getDiagnostics().get(0));
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
     
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1"));
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1",
+        TaskAttemptTerminationCause.CONTAINER_EXITED));
 
     assertEquals(2, taImpl.getDiagnostics().size());
     assertEquals("1", taImpl.getDiagnostics().get(1));
+    // err cause does not change
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
 
     int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5;
     arg = ArgumentCaptor.forClass(Event.class);
@@ -815,8 +829,9 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
 
     // Verify that the diagnostic message included in the Terminated event is not
-    // captured - TA already succeeded.
+    // captured - TA already succeeded. Error cause should be the default value
     assertEquals(0, taImpl.getDiagnostics().size());
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
   }
 
   @Test(timeout = 5000)
@@ -898,7 +913,8 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
-    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned",
+        TaskAttemptTerminationCause.NODE_FAILED));
     // Verify in KILLED state
     assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED,
         taImpl.getState());
@@ -914,6 +930,7 @@ public class TestTaskAttempt {
     // Verify still in KILLED state
     assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED,
         taImpl.getState());
+    assertEquals(TaskAttemptTerminationCause.NODE_FAILED, taImpl.getTerminationCause());
   }
   
   @Test(timeout = 5000)
@@ -995,7 +1012,8 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
     // Send out a Node Failure.
-    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+    taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned", 
+        TaskAttemptTerminationCause.NODE_FAILED));
 
     // Verify no additional events
     int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
@@ -1005,6 +1023,8 @@ public class TestTaskAttempt {
     // Verify still in SUCCEEDED state
     assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED,
         taImpl.getState());
+    // error cause remains as default value
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
   }
 
   @Test(timeout = 5000)
@@ -1083,6 +1103,8 @@ public class TestTaskAttempt {
     taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
+    // default value of error cause
+    assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
 
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
@@ -1091,6 +1113,7 @@ public class TestTaskAttempt {
     
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
+    assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause());
 
     assertEquals(true, taImpl.inputFailedReported);
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 143268b..100e8d9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -46,6 +46,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.junit.Before;
@@ -87,10 +88,15 @@ public class TestTaskAttemptRecovery {
   private void restoreFromTAFinishedEvent(TaskAttemptState state) {
     String diag = "test_diag";
     TezCounters counters = mock(TezCounters.class);
+    
+    TaskAttemptTerminationCause errorEnum = null;
+    if (state != TaskAttemptState.SUCCEEDED) {
+      errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
+    }
 
     TaskAttemptState recoveredState =
         ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            startTime, finishTime, state, diag, counters));
+            startTime, finishTime, state, errorEnum, diag, counters));
     assertEquals(startTime, ta.getLaunchTime());
     assertEquals(finishTime, ta.getFinishTime());
     assertEquals(counters, ta.reportedStatus.counters);
@@ -99,6 +105,11 @@ public class TestTaskAttemptRecovery {
     assertEquals(1, ta.getDiagnostics().size());
     assertEquals(diag, ta.getDiagnostics().get(0));
     assertEquals(state, recoveredState);
+    if (state != TaskAttemptState.SUCCEEDED) {
+      assertEquals(errorEnum, ta.getTerminationCause());
+    } else {
+      assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause());
+    }
   }
 
   private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 88fa83d..e363dbd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -54,7 +54,6 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -64,6 +63,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -173,7 +173,7 @@ public class TestTaskImpl {
   }
 
   private void killTask(TezTaskID taskId) {
-    mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
     assertTaskKillWaitState();
   }
 
@@ -553,18 +553,18 @@ public class TestTaskImpl {
   @Test
   public void testDiagnostics_KillNew(){
     TezTaskID taskId = getNewTaskID();
-    mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
     assertEquals(1, mockTask.getDiagnostics().size());
-    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
   }
   
   @Test
   public void testDiagnostics_Kill(){
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
-    mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.OTHER_TASK_FAILURE));
+    mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
     assertEquals(1, mockTask.getDiagnostics().size());
-    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.OTHER_TASK_FAILURE.name()));
+    assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
   }
 
   // TODO Add test to validate the correct commit attempt.


Mime
View raw message