tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [3/3] tez git commit: TEZ-3161. Allow task to report different kinds of errors - fatal / kill (sseth)
Date Wed, 06 Apr 2016 03:41:51 GMT
TEZ-3161. Allow task to report different kinds of errors - fatal / kill (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/27a13fc9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/27a13fc9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/27a13fc9

Branch: refs/heads/master
Commit: 27a13fc9761784c37af8adfaeb9337a4abae7182
Parents: 5ce07f8
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Apr 5 20:41:26 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Apr 5 20:41:26 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/ATSConstants.java     |   1 +
 .../org/apache/tez/runtime/api/TaskContext.java |  34 ++-
 .../apache/tez/runtime/api/TaskFailureType.java |  31 +++
 tez-dag/pom.xml                                 |   1 +
 .../tez/dag/app/TaskCommunicatorManager.java    |  26 ++-
 .../tez/dag/app/TaskHeartbeatHandler.java       |   3 +-
 .../event/TaskAttemptEventAttemptFailed.java    |  27 ++-
 .../dag/app/dag/event/TaskEventTAFailed.java    |  46 ++++
 .../dag/app/dag/event/TaskEventTAKilled.java    |  37 +++
 .../dag/app/dag/event/TaskEventTALaunched.java  |  29 +++
 .../dag/app/dag/event/TaskEventTASucceeded.java |  29 +++
 .../dag/app/dag/event/TaskEventTAUpdate.java    |  16 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  64 ++++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  21 +-
 .../events/TaskAttemptFinishedEvent.java        |  17 ++
 .../impl/HistoryEventJsonConversion.java        |   3 +
 .../api/TaskCommunicatorContext.java            |   4 +-
 tez-dag/src/main/proto/HistoryEvents.proto      |   2 +
 .../apache/tez/dag/app/TestRecoveryParser.java  |   2 +-
 .../dag/app/TestTaskCommunicatorManager2.java   | 227 +++++++++++++++----
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  21 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 184 +++++++++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 184 ++++++++++-----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  18 +-
 .../TestHistoryEventsProtoConversion.java       |  61 ++++-
 .../impl/TestHistoryEventJsonConversion.java    |   2 +-
 .../org/apache/tez/mapreduce/TestUmbilical.java |  18 +-
 .../ats/HistoryEventTimelineConversion.java     |   3 +
 .../ats/TestHistoryEventTimelineConversion.java |  20 +-
 tez-runtime-internals/pom.xml                   |   2 +-
 .../apache/tez/common/TezConverterUtils.java    |  30 ++-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  10 +-
 .../org/apache/tez/runtime/RuntimeTask.java     |  18 +-
 .../api/events/TaskAttemptFailedEvent.java      |   9 +-
 .../api/events/TaskAttemptKilledEvent.java      |  34 +++
 .../apache/tez/runtime/api/impl/EventType.java  |   1 +
 .../apache/tez/runtime/api/impl/TezEvent.java   |  26 ++-
 .../runtime/api/impl/TezInputContextImpl.java   |  13 ++
 .../runtime/api/impl/TezOutputContextImpl.java  |  13 ++
 .../api/impl/TezProcessorContextImpl.java       |  15 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |  17 +-
 .../tez/runtime/api/impl/TezUmbilical.java      |  13 +-
 .../internals/api/TaskReporterInterface.java    |   9 +-
 .../org/apache/tez/runtime/task/EndReason.java  |   3 +-
 .../apache/tez/runtime/task/TaskReporter.java   |  45 ++--
 .../tez/runtime/task/TaskRunner2Callable.java   |  13 +-
 .../tez/runtime/task/TaskRunner2Result.java     |  26 ++-
 .../apache/tez/runtime/task/TezTaskRunner2.java | 206 ++++++++++-------
 .../src/main/proto/Events.proto                 |  28 ---
 .../src/main/proto/RuntimeEvents.proto          |  38 ++++
 .../runtime/task/TaskExecutionTestHelpers.java  | 118 ++++++++--
 .../tez/runtime/task/TestTaskExecution2.java    | 156 +++++++++++--
 .../common/shuffle/impl/ShuffleManager.java     |   7 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  |   3 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   7 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |   9 +-
 .../shuffle/orderedgrouped/TestShuffle.java     |   4 +-
 .../TestUnorderedPartitionedKVWriter.java       |   7 +-
 .../processor/FilterByWordInputProcessor.java   |   3 +-
 .../java/org/apache/tez/test/TestInput.java     |   3 +-
 .../java/org/apache/tez/test/TestProcessor.java |   3 +-
 .../tez/test/TestTaskErrorsUsingLocalMode.java  | 215 ++++++++++++++++++
 63 files changed, 1791 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41752a9..415a246 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3161. Allow task to report different kinds of errors - fatal / kill.
   TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection.
   TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing
   TEZ-3173. Update Tez AM REST APIs for more information for each vertex.

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 7204943..0b8c67d 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -69,6 +69,7 @@ public class ATSConstants {
   public static final String FINISH_TIME = "endTime";
   public static final String TIME_TAKEN = "timeTaken";
   public static final String STATUS = "status";
+  public static final String TASK_FAILURE_TYPE = "taskFailureType";
   public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
   public static final String DIAGNOSTICS = "diagnostics";
   public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 457b0de..b5e42bc 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -23,7 +23,9 @@ import java.util.List;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.UserPayload;
@@ -132,13 +134,41 @@ public interface TaskContext {
   public void notifyProgress();
 
   /**
-   * Report a fatal error to the framework. This will cause the entire task to
-   * fail and should not be used for reporting temporary or recoverable errors
+   * Report an error to the framework. This will cause the taskAttempt to fail, and should not be used
+   * to report errors which can be handled locally in the TaskAttempt. A new TaskAttempt will be launched
+   * depending upon how many retries are available for the task.
+   *
+   * @deprecated Replaced by {@link #reportFailure(TaskFailureType, Throwable, String)} (FailureType, Throwable, String)}
+   *
+   * Note: To maintain compatibility, even though this method is named 'fatalError' - this method
+   * operates as {@link #reportFailure(TaskFailureType, Throwable, String)}
+   * with the TaskFailureType set to {@link TaskFailureType#NON_FATAL}.
    *
    * @param exception an exception representing the error
+   * @param message a diagnostic message which may be associated with the error
    */
+  @Deprecated
   public void fatalError(@Nullable Throwable exception, @Nullable String message);
 
+
+  /**
+   * Report an error to the framework. This will cause the entire task to be terminated.
+   *
+   * @param taskFailureType the type of the error
+   * @param exception any exception that may be associated with the error
+   * @param message a diagnostic message which may be associated with the error
+   */
+  void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception, @Nullable String message);
+
+  /**
+   * Kill the currently running attempt.
+   * @param exception an associated exception
+   * @param message an associated diagnostic message
+   */
+  @Private
+  @Unstable
+  void killSelf(@Nullable Throwable exception, @Nullable String message);
+
   /**
    * Returns meta-data for the specified service. As an example, when the MR
    * ShuffleHandler is used - this would return the jobToken serialized as bytes

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java
new file mode 100644
index 0000000..41a665f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskFailureType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public enum TaskFailureType {
+  /**
+   * Indicates an error, which can potentially be recovered from when another attempt is launched
+   */
+  NON_FATAL,
+
+  /**
+   * Indicates an error which is fatal; no more attempts will be made for the task
+   */
+  FATAL,
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 71385b0..45a3c3f 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -200,6 +200,7 @@
               <imports>
                 <param>${basedir}/src/main/proto</param>
                 <param>${basedir}/../tez-api/src/main/proto</param>
+                <param>${basedir}/../tez-runtime-internals/src/main/proto</param>
               </imports>
               <source>
                 <directory>${basedir}/src/main/proto</directory>

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 403e1a1..36b74de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,6 +33,8 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
@@ -277,7 +279,8 @@ public class TaskCommunicatorManager extends AbstractService implements
           taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
               (TaskStatusUpdateEvent) tezEvent.getEvent());
         } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT
-           || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT) {
+           || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT
+           || eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) {
           taFinishedEvents.add(tezEvent);
         } else {
           if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
@@ -306,6 +309,7 @@ public class TaskCommunicatorManager extends AbstractService implements
         EventMetaData sourceMeta = e.getSourceInfo();
         switch (e.getEventType()) {
         case TASK_ATTEMPT_FAILED_EVENT:
+        case TASK_ATTEMPT_KILLED_EVENT:
           TaskAttemptTerminationCause errCause = null;
           switch (sourceMeta.getEventGenerator()) {
           case INPUT:
@@ -324,12 +328,19 @@ public class TaskCommunicatorManager extends AbstractService implements
             throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
                 sourceMeta.getEventGenerator());
           }
-          TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent();
-          sendEvent(
-               new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
-                   TaskAttemptEventType.TA_FAILED,
-                  "Error: " + taskFailedEvent.getDiagnostics(),
+          if (e.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) {
+            TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) e.getEvent();
+            sendEvent(
+                new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
+                    TaskAttemptEventType.TA_FAILED, taskFailedEvent.getTaskFailureType(),
+                    "Error: " + taskFailedEvent.getDiagnostics(),
                     errCause));
+          } else { // Killed
+            TaskAttemptKilledEvent taskKilledEvent = (TaskAttemptKilledEvent) e.getEvent();
+            sendEvent(
+                new TaskAttemptEventAttemptKilled(sourceMeta.getTaskAttemptID(),
+                    "Error: " + taskKilledEvent.getDiagnostics(), errCause));
+          }
           break;
         case TASK_ATTEMPT_COMPLETED_EVENT:
           sendEvent(
@@ -387,8 +398,9 @@ public class TaskCommunicatorManager extends AbstractService implements
     // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
     // instead of waiting for the unregister to flow through the Container.
     // Fix along the same lines as TEZ-2124 by introducing an explict context.
+    //TODO-3183. Allow the FailureType to be specified
     sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
-        TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+        TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
         taskAttemptEndReason)));
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 2f03474..296073a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -25,6 +25,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.TaskFailureType;
 
 
 /**
@@ -60,7 +61,7 @@ public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID>
   @Override
   protected void handleTimeOut(TezTaskAttemptID attemptId) {
     eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
-        TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
+        TaskAttemptEventType.TA_TIMED_OUT, TaskFailureType.NON_FATAL, "AttemptID:" + attemptId.toString()
         + " Timed out after " + timeOut / 1000 + " secs", TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 21c6b14..299847c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,29 +18,36 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import com.google.common.base.Preconditions;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.TaskFailureType;
 
 public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent 
   implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent, RecoveryEvent {
 
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
-  private boolean isFromRecovery = false;
+  private final TaskFailureType taskFailureType;
+  private final boolean isFromRecovery;
 
   /* Accepted Types - FAILED, TIMED_OUT */
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
-      TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
-    super(id, type);
-    this.diagnostics = diagnostics;
-    this.errorCause = errorCause;
+                                       TaskAttemptEventType type, TaskFailureType taskFailureType,
+                                       String diagnostics,
+                                       TaskAttemptTerminationCause errorCause) {
+    this(id, type, taskFailureType, diagnostics, errorCause, false);
   }
 
   /* Accepted Types - FAILED, TIMED_OUT */
   public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
-      TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause,
-      boolean isFromRecovery) {
-    this(id, type, diagnostics, errorCause);
+                                       TaskAttemptEventType type, TaskFailureType taskFailureType, String diagnostics, TaskAttemptTerminationCause errorCause,
+                                       boolean isFromRecovery) {
+    super(id, type);
+    Preconditions.checkNotNull(taskFailureType, "FailureType must be set for a FAILED task attempt");
+    this.diagnostics = diagnostics;
+    this.errorCause = errorCause;
+    this.taskFailureType = taskFailureType;
     this.isFromRecovery = isFromRecovery;
   }
 
@@ -58,4 +65,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
   public boolean isFromRecovery() {
     return isFromRecovery;
   }
+
+  public TaskFailureType getTaskFailureType() {
+    return taskFailureType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java
new file mode 100644
index 0000000..d6f1526
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAFailed.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import com.google.common.base.Preconditions;
+import org.apache.tez.common.TezAbstractEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.TaskFailureType;
+
+@SuppressWarnings("rawtypes")
+public class TaskEventTAFailed extends TaskEventTAUpdate {
+
+  private final TezAbstractEvent causalEvent;
+  private final TaskFailureType taskFailureType;
+
+  public TaskEventTAFailed(TezTaskAttemptID id, TaskFailureType taskFailureType, TezAbstractEvent causalEvent) {
+    super(id, TaskEventType.T_ATTEMPT_FAILED);
+    Preconditions.checkNotNull(taskFailureType, "FailureType must be specified for a failed attempt");
+    this.taskFailureType = taskFailureType;
+    this.causalEvent = causalEvent;
+  }
+
+  public TezAbstractEvent getCausalEvent() {
+    return causalEvent;
+  }
+
+  public TaskFailureType getTaskFailureType() {
+    return taskFailureType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java
new file mode 100644
index 0000000..c410361
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAKilled.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.common.TezAbstractEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@SuppressWarnings("rawtypes")
+public class TaskEventTAKilled extends TaskEventTAUpdate {
+
+  private final TezAbstractEvent causalEvent;
+
+  public TaskEventTAKilled(TezTaskAttemptID id, TezAbstractEvent causalEvent) {
+    super(id, TaskEventType.T_ATTEMPT_KILLED);
+    this.causalEvent = causalEvent;
+  }
+
+  public TezAbstractEvent getCausalEvent() {
+    return causalEvent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java
new file mode 100644
index 0000000..a9d46c5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTALaunched.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@SuppressWarnings("rawtypes")
+public class TaskEventTALaunched extends TaskEventTAUpdate {
+
+  public TaskEventTALaunched(TezTaskAttemptID id) {
+    super(id, TaskEventType.T_ATTEMPT_LAUNCHED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java
new file mode 100644
index 0000000..e74e2bf
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTASucceeded.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+@SuppressWarnings("rawtypes")
+public class TaskEventTASucceeded extends TaskEventTAUpdate {
+
+  public TaskEventTASucceeded(TezTaskAttemptID id) {
+    super(id, TaskEventType.T_ATTEMPT_SUCCEEDED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
index 01eaf5b..346c9d1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTAUpdate.java
@@ -18,31 +18,19 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 @SuppressWarnings("rawtypes")
-public class TaskEventTAUpdate extends TaskEvent {
+public abstract class TaskEventTAUpdate extends TaskEvent {
 
   private TezTaskAttemptID attemptID;
-  private TezAbstractEvent causalEvent;
 
   public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type) {
-    this(id, type, null);
-  }
-
-  public TaskEventTAUpdate(TezTaskAttemptID id, TaskEventType type, TezAbstractEvent causalEvent) {
     super(id.getTaskID(), type);
     this.attemptID = id;
-    this.causalEvent = causalEvent;
   }
-  
+
   public TezTaskAttemptID getTaskAttemptID() {
     return attemptID;
   }
-  
-  public TezAbstractEvent getCausalEvent() {
-    return causalEvent;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 702c323..6169a7b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,6 +31,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
+import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
+import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
+import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -86,8 +92,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
@@ -1058,7 +1062,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
-        getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+        getFinishTime(), TaskAttemptState.SUCCEEDED, null, null,
         "", getCounters(), lastDataEvents, taGeneratedEvents,
         creationTime, creationCausalTA, allocationTime,
         null, null, null, null, null);
@@ -1068,10 +1072,13 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   protected void logJobHistoryAttemptUnsuccesfulCompletion(
-      TaskAttemptState state) {
+      TaskAttemptState state, TaskFailureType taskFailureType) {
     Preconditions.checkArgument(recoveryData == null
         || recoveryData.getTaskAttemptFinishedEvent() == null,
         "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent");
+    if (state == TaskAttemptState.FAILED && taskFailureType == null) {
+      throw new IllegalStateException("FAILED state must be accompanied by a FailureType");
+    }
     long finishTime = getFinishTime();
     ContainerId unsuccessfulContainerId = null;
     NodeId unsuccessfulContainerNodeId = null;
@@ -1087,6 +1094,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
         finishTime, state,
+        taskFailureType,
         terminationCause,
         StringUtils.join(
             getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
@@ -1169,6 +1177,7 @@ public class TaskAttemptImpl implements TaskAttempt,
                   + ", attemptId=" + ta.attemptId);
             }
             ta.sendEvent(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+                taFinishedEvent.getTaskFailureType(),
                 taFinishedEvent.getDiagnostics(), taFinishedEvent.getTaskAttemptError(), true));
             break;
           case KILLED:
@@ -1210,7 +1219,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         LOG.error(msg, e);
         String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
         new TerminateTransition(FAILED_HELPER).transition(ta,
-            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
+            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag,
                 TaskAttemptTerminationCause.APPLICATION_ERROR));
         return TaskAttemptStateInternal.FAILED;
       }
@@ -1325,7 +1334,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           ta.recoveryData.getTaskAttemptFinishedEvent() == null) {
         ta.setFinishTime();
         ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
-            .getTaskAttemptState());
+            .getTaskAttemptState(), helper.getFailureType(event));
       } else {
         ta.finishTime = ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime();
       }
@@ -1341,8 +1350,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
-      ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
-          .getTaskEventType(), event));
+      ta.sendEvent(helper.getTaskEvent(ta.attemptId,  event));
     }
   }
 
@@ -1393,8 +1401,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       }
 
       // Inform the Task
-      ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
-          TaskEventType.T_ATTEMPT_LAUNCHED));
+      ta.sendEvent(new TaskEventTALaunched(ta.attemptId));
       
       if (ta.isSpeculationEnabled()) {
         ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
@@ -1503,6 +1510,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           ta.sendEvent(
               new TaskAttemptEventAttemptFailed(ta.getID(),
                   TaskAttemptEventType.TA_FAILED,
+                  TaskFailureType.NON_FATAL,
                   diagnostics, 
                   TaskAttemptTerminationCause.NO_PROGRESS)
               );
@@ -1567,8 +1575,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
-      ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
-          TaskEventType.T_ATTEMPT_SUCCEEDED));
+      ta.sendEvent(new TaskEventTASucceeded(ta.attemptId));
 
       // Unregister from the TaskHeartbeatHandler.
       ta.taskHeartbeatHandler.unregister(ta.attemptId);
@@ -1788,11 +1795,13 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   protected interface TerminatedTransitionHelper {
 
-    public TaskAttemptStateInternal getTaskAttemptStateInternal();
+    TaskAttemptStateInternal getTaskAttemptStateInternal();
+
+    TaskAttemptState getTaskAttemptState();
 
-    public TaskAttemptState getTaskAttemptState();
+    TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId, TaskAttemptEvent event);
 
-    public TaskEventType getTaskEventType();
+    TaskFailureType getFailureType(TaskAttemptEvent event);
   }
 
   protected static class FailedTransitionHelper implements
@@ -1807,8 +1816,19 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
 
     @Override
-    public TaskEventType getTaskEventType() {
-      return TaskEventType.T_ATTEMPT_FAILED;
+    public TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId,
+                                  TaskAttemptEvent event) {
+      return new TaskEventTAFailed(taskAttemptId, getFailureType(event), event);
+    }
+
+    @Override
+    public TaskFailureType getFailureType(TaskAttemptEvent event) {
+      if (event instanceof TaskAttemptEventAttemptFailed) {
+        return ( ((TaskAttemptEventAttemptFailed) event).getTaskFailureType());
+      } else {
+        // For alternate failure scenarios like OUTPUT_FAILED, CONTAINER_TERMINATING, NODE_FAILED, etc
+        return TaskFailureType.NON_FATAL;
+      }
     }
   }
 
@@ -1826,8 +1846,14 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
 
     @Override
-    public TaskEventType getTaskEventType() {
-      return TaskEventType.T_ATTEMPT_KILLED;
+    public TaskEvent getTaskEvent(TezTaskAttemptID taskAttemptId,
+                                  TaskAttemptEvent event) {
+      return new TaskEventTAKilled(taskAttemptId, event);
+    }
+
+    @Override
+    public TaskFailureType getFailureType(TaskAttemptEvent event) {
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9217e84..26ba004 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -37,6 +37,8 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1166,7 +1168,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       task.getVertex().incrementFailedTaskAttemptCount();
-      TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      TaskEventTAFailed castEvent = (TaskEventTAFailed) event;
       schedulingCausalTA = castEvent.getTaskAttemptID();
       task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " failed,"
           + " info=" + task.getAttempt(castEvent.getTaskAttemptID()).getDiagnostics());
@@ -1177,7 +1179,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       // The attempt would have informed the scheduler about it's failure
 
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
-      if (task.failedAttempts < task.maxFailedAttempts) {
+      if (task.failedAttempts < task.maxFailedAttempts &&
+          castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) {
         task.handleTaskAttemptCompletion(
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
@@ -1189,9 +1192,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           task.addAndScheduleAttempt(getSchedulingCausalTA());
         }
       } else {
-        LOG.info("Failing task: " + task.getTaskId()
-            + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: "
-            + task.maxFailedAttempts);
+        if (castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) {
+          LOG.info(
+              "Failing task: {} due to too many failed attempts. currentFailedAttempts={}, maxFailedAttempts={}",
+              task.getTaskId(), task.failedAttempts, task.maxFailedAttempts);
+        } else {
+          LOG.info(
+              "Failing task: {} due to {} error reported by TaskAttempt. CurrentFailedAttempts={}",
+              task.getTaskId(), TaskFailureType.FATAL, task.failedAttempts);
+        }
         task.handleTaskAttemptCompletion(
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
@@ -1225,7 +1234,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         task.internalError(event.getType());
       }
 
-      TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      TaskEventTAFailed castEvent = (TaskEventTAFailed) event;
       TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID();
       TaskAttempt failedAttempt = task.getAttempt(failedAttemptId);
       ContainerId containerId = failedAttempt.getAssignedContainerID();

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 8e31a25..5a9d8c9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -18,11 +18,14 @@
 
 package org.apache.tez.dag.history.events;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 
+import org.apache.tez.common.TezConverterUtils;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +60,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private long finishTime;
   private TezTaskAttemptID creationCausalTA;
   private TaskAttemptState state;
+  private TaskFailureType taskFailureType;
   private String diagnostics;
   private TezCounters tezCounters;
   private TaskAttemptTerminationCause error;
@@ -73,6 +77,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       long startTime,
       long finishTime,
       TaskAttemptState state,
+      @Nullable TaskFailureType taskFailureType,
       TaskAttemptTerminationCause error,
       String diagnostics, TezCounters counters, 
       List<DataEventDependencyInfo> dataEvents,
@@ -93,6 +98,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.startTime = startTime;
     this.finishTime = finishTime;
     this.state = state;
+    this.taskFailureType = taskFailureType;
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.error = error;
@@ -136,6 +142,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         .setAllocationTime(allocationTime)
         .setStartTime(startTime)
         .setFinishTime(finishTime);
+    if (taskFailureType != null) {
+      builder.setTaskFailureType(TezConverterUtils.failureTypeToProto(taskFailureType));
+    }
     if (creationCausalTA != null) {
       builder.setCreationCausalTA(creationCausalTA.toString());
     }
@@ -177,6 +186,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.allocationTime = proto.getAllocationTime();
     this.startTime = proto.getStartTime();
     this.finishTime = proto.getFinishTime();
+    if (proto.hasTaskFailureType()) {
+      this.taskFailureType = TezConverterUtils.failureTypeFromProto(proto.getTaskFailureType());
+    }
     if (proto.hasCreationCausalTA()) {
       this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA());
     }
@@ -244,6 +256,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", finishTime=" + finishTime
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
+        + ", taskFailureType=" + taskFailureType
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
         + ", containerId=" + (containerId != null ? containerId.toString() : "")
@@ -276,6 +289,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     return state;
   }
 
+  public TaskFailureType getTaskFailureType() {
+    return taskFailureType;
+  }
+
   public long getStartTime() {
     return startTime;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 9bca440..75116c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -548,6 +548,9 @@ public class HistoryEventJsonConversion {
     if (event.getTaskAttemptError() != null) {
       otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
     }
+    if (event.getTaskFailureType() != null) {
+      otherInfo.put(ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name());
+    }
     otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     otherInfo.put(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToJSON(event.getCounters()));

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index c551b09..4990d95 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -141,7 +141,9 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
                   @Nullable String diagnostics);
 
   /**
-   * Inform the framework that a task has failed
+   * Inform the framework that a task has failed. This, at the moment, is always treated as a
+   * an error which will cause a retry of the task to be triggered, if there are enough retry
+   * attempts left.
    *
    * @param taskAttemptId        the relevant task attempt id
    * @param taskAttemptEndReason the reason for the task failure

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index f3aeed4..ff3707d 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -22,6 +22,7 @@ option java_generate_equals_and_hash = true;
 
 import "DAGApiRecords.proto";
 import "Events.proto";
+import "RuntimeEvents.proto";
 
 message AMLaunchedProto {
   optional string application_attempt_id = 1;
@@ -185,6 +186,7 @@ message TaskAttemptFinishedProto {
   optional int64 finish_time = 5;
   optional string creation_causal_t_a = 6;
   optional int32 state = 7;
+  optional TaskFailureTypeProto task_failure_type = 16;
   optional string diagnostics = 8;
   optional TezCountersProto counters = 9;
   optional string error_enum = 10;

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 12e75a7..962b230 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -702,7 +702,7 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID, ta0t2v2StartedEvent));
     TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent(
         ta0t2v2Id, "v1", 500L, 600L, 
-        TaskAttemptState.SUCCEEDED, null, "", null, 
+        TaskAttemptState.SUCCEEDED, null, null, "", null,
         null, null, 0L, null, 0L, null, null, null, null, null);
     rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
index 9700524..4950e09 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -17,14 +17,20 @@ package org.apache.tez.dag.app;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 
@@ -38,12 +44,21 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
@@ -54,6 +69,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -62,59 +78,30 @@ public class TestTaskCommunicatorManager2 {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testTaskAttemptFailedKilled() throws IOException, TezException {
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    Credentials credentials = new Credentials();
-    AppContext appContext = mock(AppContext.class);
-    EventHandler eventHandler = mock(EventHandler.class);
-    DAG dag = mock(DAG.class);
-    AMContainerMap amContainerMap = mock(AMContainerMap.class);
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
-    doReturn(eventHandler).when(appContext).getEventHandler();
-    doReturn(dag).when(appContext).getCurrentDAG();
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    doReturn(credentials).when(appContext).getAppCredentials();
-    doReturn(appAcls).when(appContext).getApplicationACLs();
-    doReturn(amContainerMap).when(appContext).getAllContainers();
-    NodeId nodeId = NodeId.newInstance("localhost", 0);
-    AMContainer amContainer = mock(AMContainer.class);
-    Container container = mock(Container.class);
-    doReturn(nodeId).when(container).getNodeId();
-    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
-    doReturn(container).when(amContainer).getContainer();
 
-    Configuration conf = new TezConfiguration();
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    TaskCommunicatorManager taskAttemptListener =
-        new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
-
-    TaskSpec taskSpec1 = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
+    TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest();
+
+    TaskSpec taskSpec1 = wrapper.createTaskSpec();
     AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
 
-    TaskSpec taskSpec2 = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+    TaskSpec taskSpec2 = wrapper.createTaskSpec();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
 
-    ContainerId containerId1 = createContainerId(appId, 1);
-    taskAttemptListener.registerRunningContainer(containerId1, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
-    ContainerId containerId2 = createContainerId(appId, 2);
-    taskAttemptListener.registerRunningContainer(containerId2, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
+    ContainerId containerId1 = wrapper.createContainerId(1);
+    wrapper.registerRunningContainer(containerId1);
+    wrapper.registerTaskAttempt(containerId1, amContainerTask1);
 
+    ContainerId containerId2 = wrapper.createContainerId(2);
+    wrapper.registerRunningContainer(containerId2);
+    wrapper.registerTaskAttempt(containerId2, amContainerTask2);
 
-    taskAttemptListener
-        .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
-    taskAttemptListener
-        .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
+    wrapper.getTaskCommunicatorManager().taskFailed(amContainerTask1.getTask().getTaskAttemptID(),
+        TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+    wrapper.getTaskCommunicatorManager().taskKilled(amContainerTask2.getTask().getTaskAttemptID(),
+        TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
 
     ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+    verify(wrapper.getEventHandler(), times(2)).handle(argumentCaptor.capture());
     assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
     assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
     TaskAttemptEventAttemptFailed failedEvent =
@@ -128,13 +115,153 @@ public class TestTaskCommunicatorManager2 {
 
     assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
     assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
-    // TODO TEZ-2003. Verify unregistration from the registered list
+//   TODO TEZ-2003. Verify unregistration from the registered list
   }
 
-  @SuppressWarnings("deprecation")
-  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
-    return containerId;
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testTaskAttemptFailureViaHeartbeatNonFatal() throws IOException, TezException {
+
+    TaskCommunicatorManagerWrapperForTest wrapper = new TaskCommunicatorManagerWrapperForTest();
+
+    TaskSpec taskSpec1 = wrapper.createTaskSpec();
+    AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
+
+    TaskSpec taskSpec2 = wrapper.createTaskSpec();
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
+
+    ContainerId containerId1 = wrapper.createContainerId(1);
+    wrapper.registerRunningContainer(containerId1);
+    wrapper.registerTaskAttempt(containerId1, amContainerTask1);
+
+    ContainerId containerId2 = wrapper.createContainerId(2);
+    wrapper.registerRunningContainer(containerId2);
+    wrapper.registerTaskAttempt(containerId2, amContainerTask2);
+
+    List<TezEvent> events = new LinkedList<>();
+
+    EventMetaData sourceInfo1 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "testVertex", null,
+            taskSpec1.getTaskAttemptID());
+    TaskAttemptFailedEvent failedEvent1 = new TaskAttemptFailedEvent("non-fatal test error",
+        TaskFailureType.NON_FATAL);
+    TezEvent failedEventT1 = new TezEvent(failedEvent1, sourceInfo1);
+    events.add(failedEventT1);
+    TaskHeartbeatRequest taskHeartbeatRequest1 =
+        new TaskHeartbeatRequest(containerId1.toString(), taskSpec1.getTaskAttemptID(), events, 0,
+            0, 0);
+    wrapper.getTaskCommunicatorManager().heartbeat(taskHeartbeatRequest1);
+
+    ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+    TaskAttemptEventAttemptFailed failedEvent =
+        (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
+    assertTrue(failedEvent.getDiagnosticInfo().contains("non-fatal"));
+
+    events.clear();
+    reset(wrapper.getEventHandler());
+
+    EventMetaData sourceInfo2 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "testVertex", null,
+            taskSpec2.getTaskAttemptID());
+    TaskAttemptFailedEvent failedEvent2 = new TaskAttemptFailedEvent("-fatal- test error",
+        TaskFailureType.FATAL);
+    TezEvent failedEventT2 = new TezEvent(failedEvent2, sourceInfo2);
+    events.add(failedEventT2);
+    TaskHeartbeatRequest taskHeartbeatRequest2 =
+        new TaskHeartbeatRequest(containerId2.toString(), taskSpec2.getTaskAttemptID(), events, 0,
+            0, 0);
+    wrapper.getTaskCommunicatorManager().heartbeat(taskHeartbeatRequest2);
+
+    argumentCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(wrapper.getEventHandler(), times(1)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+    failedEvent = (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+    assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType());
+    assertTrue(failedEvent.getDiagnosticInfo().contains("-fatal-"));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static class TaskCommunicatorManagerWrapperForTest {
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    Credentials credentials = new Credentials();
+    AppContext appContext = mock(AppContext.class);
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = mock(DAG.class);
+    Vertex vertex = mock(Vertex.class);
+    TezDAGID dagId;
+    TezVertexID vertexId;
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+    Configuration conf = new TezConfiguration();
+    UserPayload userPayload;
+    TaskCommunicatorManager taskCommunicatorManager;
+    private AtomicInteger taskIdCounter = new AtomicInteger(0);
+
+    TaskCommunicatorManagerWrapperForTest() throws IOException, TezException {
+      dagId = TezDAGID.getInstance(appId, 1);
+      vertexId = TezVertexID.getInstance(dagId, 100);
+      doReturn(eventHandler).when(appContext).getEventHandler();
+      doReturn(dag).when(appContext).getCurrentDAG();
+      doReturn(vertex).when(dag).getVertex(eq(vertexId));
+      doReturn(new TaskAttemptEventInfo(0, new LinkedList<TezEvent>(), 0)).when(vertex)
+          .getTaskAttemptTezEvents(any(TezTaskAttemptID.class), anyInt(), anyInt(), anyInt());
+      doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+      doReturn(credentials).when(appContext).getAppCredentials();
+      doReturn(appAcls).when(appContext).getApplicationACLs();
+      doReturn(amContainerMap).when(appContext).getAllContainers();
+      doReturn(new SystemClock()).when(appContext).getClock();
+
+      NodeId nodeId = NodeId.newInstance("localhost", 0);
+      AMContainer amContainer = mock(AMContainer.class);
+      Container container = mock(Container.class);
+      doReturn(nodeId).when(container).getNodeId();
+      doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+      doReturn(container).when(amContainer).getContainer();
+
+      userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+      taskCommunicatorManager =
+          new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
+              mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
+              TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
+    }
+
+
+    TaskCommunicatorManager getTaskCommunicatorManager() {
+      return taskCommunicatorManager;
+    }
+
+    EventHandler getEventHandler() {
+      return eventHandler;
+    }
+
+    private void registerRunningContainer(ContainerId containerId) {
+      taskCommunicatorManager.registerRunningContainer(containerId, 0);
+    }
+
+    private void registerTaskAttempt(ContainerId containerId, AMContainerTask amContainerTask) {
+      taskCommunicatorManager.registerTaskAttempt(amContainerTask, containerId, 0);
+    }
+
+    private TaskSpec createTaskSpec() {
+      TaskSpec taskSpec = mock(TaskSpec.class);
+      TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIdCounter.incrementAndGet());
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+      doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
+      return taskSpec;
+    }
+
+
+    @SuppressWarnings("deprecation")
+    private ContainerId createContainerId(int id) {
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
+      return containerId;
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 0b0af7b..be1821b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -87,16 +87,12 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.app.dag.impl.TestVertexImpl.EventHandlingRootInputInitializer;
 import org.apache.tez.dag.app.rm.AMSchedulerEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.TaskSchedulerManager;
@@ -105,7 +101,6 @@ import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
@@ -121,6 +116,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.InputSpecUpdate;
@@ -130,7 +126,6 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.After;
 import org.junit.Assert;
@@ -883,7 +878,7 @@ public class TestDAGRecovery {
     taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", 0L, 0L, 
-        TaskAttemptState.SUCCEEDED, null, "", null, 
+        TaskAttemptState.SUCCEEDED, null, null, "", null,
         null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
@@ -941,7 +936,7 @@ public class TestDAGRecovery {
     initMockDAGRecoveryDataForTaskAttempt();
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
-        TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, 
+        TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null,
         null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -970,7 +965,7 @@ public class TestDAGRecovery {
     initMockDAGRecoveryDataForTaskAttempt();
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
-        TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
+        TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
         null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1030,7 +1025,7 @@ public class TestDAGRecovery {
         sourceInfo));
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
-        TaskAttemptState.SUCCEEDED, null, "", null, 
+        TaskAttemptState.SUCCEEDED, null, null, "", null,
         null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1068,7 +1063,7 @@ public class TestDAGRecovery {
     taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime, 
-        TaskAttemptState.SUCCEEDED, null, "", null, 
+        TaskAttemptState.SUCCEEDED, null, null, "", null,
         null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id);   
@@ -1119,7 +1114,7 @@ public class TestDAGRecovery {
         mock(NodeId.class), "", "", "");
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
-        TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, 
+        TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null,
         null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1150,7 +1145,7 @@ public class TestDAGRecovery {
         mock(NodeId.class), "", "", "");
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime, 
-        TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
+        TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
         null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index feb7585..68cfc39 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
+import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
+import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -117,10 +121,14 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class);
+
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
     public FileStatus getFileStatus(Path f) throws IOException {
@@ -417,9 +425,11 @@ public class TestTaskAttempt {
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
 
-    verifyEventType(
+    Event event = verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1);
+    TaskEventTAFailed failedEvent = (TaskEventTAFailed) event;
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -453,7 +463,7 @@ public class TestTaskAttempt {
     TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
-    MockEventHandler eventHandler = new MockEventHandler();
+    MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
@@ -486,10 +496,13 @@ public class TestTaskAttempt {
         mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
     // At state STARTING.
     taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
         null));
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     assertEquals("Task attempt is not in running state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
@@ -503,6 +516,30 @@ public class TestTaskAttempt {
     assertEquals("Terminated", taImpl.getDiagnostics().get(0));
     assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause());
     // TODO Ensure TA_TERMINATING after this is ingored.
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+    Event event = verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1);
+    TaskEventTAFailed failedEvent = (TaskEventTAFailed) event;
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
+
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID,
+        "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
+    // verify unregister is not invoked again
+    verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
+    int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
   }
 
   @Test(timeout = 5000)
@@ -575,7 +612,7 @@ public class TestTaskAttempt {
 
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -744,7 +781,116 @@ public class TestTaskAttempt {
     
     taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
     
-    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
+    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED,
+        TaskFailureType.NON_FATAL, "0",
+        TaskAttemptTerminationCause.APPLICATION_ERROR));
+
+    assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID);
+    assertEquals(1, taImpl.getDiagnostics().size());
+    assertEquals("0", taImpl.getDiagnostics().get(0));
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
+
+    assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEventTezEventUpdate(taImpl.getID(), Collections.EMPTY_LIST));
+    assertFalse(
+        "InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state",
+        eventHandler.internalError);
+
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1",
+        TaskAttemptTerminationCause.CONTAINER_EXITED));
+    // verify unregister is not invoked again
+    verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
+    assertEquals(2, taImpl.getDiagnostics().size());
+    assertEquals("1", taImpl.getDiagnostics().get(1));
+    // err cause does not change
+    assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+
+    Event e = verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1);
+    TaskEventTAFailed failedEvent = (TaskEventTAFailed) e;
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
+  }
+
+  @Test(timeout = 5000)
+  public void testFailureFatalError() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+
+    int expectedEventsAtRunning = 4;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+    verifyEventType(
+        arg.getAllValues().subList(0,
+            expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
+
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+
+    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED,
+        TaskFailureType.FATAL, "0",
         TaskAttemptTerminationCause.APPLICATION_ERROR));
 
     assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
@@ -776,8 +922,9 @@ public class TestTaskAttempt {
 
     Event e = verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
-    assertEquals(TaskEventType.T_ATTEMPT_FAILED, e.getType());
+            expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1);
+    TaskEventTAFailed failedEvent = (TaskEventTAFailed) e;
+    assertEquals(TaskFailureType.FATAL, failedEvent.getTaskFailureType());
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -868,6 +1015,7 @@ public class TestTaskAttempt {
     TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue();
     assertEquals(taImpl.getID(), fEvent.getTaskAttemptID());
     assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause());
+    assertEquals(TaskFailureType.NON_FATAL, fEvent.getTaskFailureType());
     taImpl.handle(fEvent);
 
     assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
@@ -982,7 +1130,7 @@ public class TestTaskAttempt {
 
     Event e = verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
     assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType());
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
@@ -1067,7 +1215,7 @@ public class TestTaskAttempt {
 
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEventsAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEventsAfterTerminating), TaskEventTASucceeded.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -1159,7 +1307,8 @@ public class TestTaskAttempt {
 
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
+
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -1182,7 +1331,7 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(expectedEvenstAfterTerminating,
-            expectedEventsNodeFailure), TaskEventTAUpdate.class, 1);
+            expectedEventsNodeFailure), TaskEventTAKilled.class, 1);
 
     // Verify still in KILLED state
     assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED,
@@ -1266,7 +1415,7 @@ public class TestTaskAttempt {
 
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
-            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+            expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
@@ -1392,6 +1541,7 @@ public class TestTaskAttempt {
     verify(mockHistHandler, times(3)).handle(histArg.capture());
     histEvent = histArg.getValue();
     finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+    assertEquals(TaskFailureType.NON_FATAL, finishEvent.getTaskFailureType());
     long newFinishTime = finishEvent.getFinishTime();
     Assert.assertEquals(finishTime, newFinishTime);
 
@@ -1399,9 +1549,11 @@ public class TestTaskAttempt {
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
     arg.getAllValues().clear();
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture());
-    verifyEventType(
+    Event e = verifyEventType(
         arg.getAllValues().subList(expectedEventsTillSucceeded,
-            expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1);
+            expectedEventsAfterFetchFailure), TaskEventTAFailed.class, 1);
+    TaskEventTAFailed failedEvent = (TaskEventTAFailed) e;
+    assertEquals(TaskFailureType.NON_FATAL, failedEvent.getTaskFailureType());
 
     taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1));
     assertEquals("Task attempt is not in FAILED state, still",
@@ -1614,9 +1766,9 @@ public class TestTaskAttempt {
 
     @Override
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
-        TaskAttemptState state) {
+        TaskAttemptState state, TaskFailureType taskFailureType) {
       taskAttemptFinishedEventLogged++;
-      super.logJobHistoryAttemptUnsuccesfulCompletion(state);
+      super.logJobHistoryAttemptUnsuccesfulCompletion(state, taskFailureType);
     }
     
     @Override


Mime
View raw message