tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/3] tez git commit: TEZ-3161. Allow task to report different kinds of errors - fatal / kill (sseth)
Date Wed, 06 Apr 2016 03:41:50 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1ed7ecb..f88ab7c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -31,6 +31,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.common.TezAbstractEvent;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
+import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
+import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
+import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
@@ -65,7 +72,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
-import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -131,6 +137,7 @@ public class TestTaskImpl {
   @Before
   public void setup() {
     conf = new Configuration();
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 4);
     taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class);
     taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     credentials = new Credentials();
@@ -180,6 +187,13 @@ public class TestTaskImpl {
     assertEquals(locationHint, mockTask.getTaskLocationHint());
   }
 
+  private void scheduleTaskAttempt(TezTaskID taskId, TaskState expectedState) {
+    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
+    assertEquals(expectedState, mockTask.getState());
+    assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
+    assertEquals(locationHint, mockTask.getTaskLocationHint());
+  }
+
   private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
     EventMetaData eventMetaData = new EventMetaData();
     DataMovementEvent dmEvent = DataMovementEvent.create(null);
@@ -199,16 +213,44 @@ public class TestTaskImpl {
     assertTaskKillWaitState();
   }
 
+  private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId) {
+    return createTaskTAKilledEvent(taskAttemptId, null);
+  }
+
+  private TaskEventTAKilled createTaskTAKilledEvent(TezTaskAttemptID taskAttemptId,
+                                                    TezAbstractEvent causalEvent) {
+    return new TaskEventTAKilled(taskAttemptId, causalEvent);
+  }
+
+  private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId) {
+    return createTaskTAFailedEvent(taskAttemptId, TaskFailureType.NON_FATAL, null);
+  }
+
+  private TaskEventTAFailed createTaskTAFailedEvent(TezTaskAttemptID taskAttemptId,
+                                                    TaskFailureType taskFailureType,
+                                                    TezAbstractEvent causalEvent) {
+    return new TaskEventTAFailed(taskAttemptId, taskFailureType, causalEvent);
+  }
+
+  private TaskEventTALaunched createTaskTALauncherEvent(TezTaskAttemptID taskAttemptId) {
+    return new TaskEventTALaunched(taskAttemptId);
+  }
+
+  private TaskEventTASucceeded createTaskTASucceededEvent(TezTaskAttemptID taskAttemptId) {
+    return new TaskEventTASucceeded(taskAttemptId);
+  }
+
+  private TaskEvent createTaskTAAddSpecAttempt(TezTaskAttemptID taskAttemptId) {
+    return new TaskEvent(taskAttemptId.getTaskID(), TaskEventType.T_ADD_SPEC_ATTEMPT);
+  }
 
   private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
-    mockTask.handle(new TaskEventTAUpdate(attemptId,
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(attemptId));
     assertTaskScheduledState();
   }
 
   private void launchTaskAttempt(TezTaskAttemptID attemptId) {
-    mockTask.handle(new TaskEventTAUpdate(attemptId,
-        TaskEventType.T_ATTEMPT_LAUNCHED));
+    mockTask.handle(createTaskTALauncherEvent(attemptId));
     assertTaskRunningState();
   }
 
@@ -222,17 +264,21 @@ public class TestTaskImpl {
   }
 
   private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
-    mockTask.handle(new TaskEventTAUpdate(attemptId,
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(attemptId));
     assertTaskRunningState();
     verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount();
   }
 
   private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
+    failRunningTaskAttempt(attemptId, true);
+  }
+
+  private void failRunningTaskAttempt(TezTaskAttemptID attemptId, boolean verifyState) {
     int failedAttempts = mockTask.failedAttempts;
-    mockTask.handle(new TaskEventTAUpdate(attemptId,
-        TaskEventType.T_ATTEMPT_FAILED));
-    assertTaskRunningState();
+    mockTask.handle(createTaskTAFailedEvent(attemptId));
+    if (verifyState) {
+      assertTaskRunningState();
+    }
     Assert.assertEquals(failedAttempts + 1, mockTask.failedAttempts);
     verify(mockTask.getVertex(), times(failedAttempts + 1)).incrementFailedTaskAttemptCount();
   }
@@ -310,13 +356,50 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killTask(taskId);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
     verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
   }
 
   @Test(timeout = 5000)
+  public void testTooManyFailedAtetmpts() {
+    LOG.info("--- START: testTooManyFailedAttempts ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
+
+    scheduleTaskAttempt(taskId, TaskState.RUNNING);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
+
+    scheduleTaskAttempt(taskId, TaskState.RUNNING);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
+
+    scheduleTaskAttempt(taskId, TaskState.RUNNING);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID(), false);
+
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
+  }
+
+  @Test(timeout = 5000)
+  public void testFailedAttemptWithFatalError() {
+    LOG.info("--- START: testFailedAttemptWithFatalError ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    mockTask.handle(
+        createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.FATAL, null));
+
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+    assertEquals(1, mockTask.failedAttempts);
+    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
+  }
+
+  @Test(timeout = 5000)
   public void testKillRunningTaskPreviousKilledAttempts() {
     LOG.info("--- START: testKillRunningTaskPreviousKilledAttempts ---");
     TezTaskID taskId = getNewTaskID();
@@ -325,8 +408,7 @@ public class TestTaskImpl {
     killRunningTaskAttempt(mockTask.getLastAttempt().getID());
     assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
     killTask(taskId);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
 
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
     verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
@@ -342,8 +424,7 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killTask(taskId);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
   }
   
@@ -357,8 +438,7 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killTask(taskId);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
   }
 
@@ -415,15 +495,13 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     killTask(taskId);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
 
     // Send duplicate kill for same attempt
     // This will not happen in practice but this is to simulate handling
     // of killed attempts in killed state.
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.KILLED, mockTask.getInternalState());
 
   }
@@ -437,14 +515,12 @@ public class TestTaskImpl {
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     for (int i = 0; i < mockTask.maxFailedAttempts; ++i) {
-      mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-          TaskEventType.T_ATTEMPT_FAILED));
+      mockTask.handle(createTaskTAFailedEvent(mockTask.getLastAttempt().getID()));
     }
     assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
 
     // Send kill for an attempt
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
     assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
 
   }
@@ -540,8 +616,7 @@ public class TestTaskImpl {
         mockTask.canCommit(mockTask.getLastAttempt().getID()));
 
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
 
     assertTaskSucceededState();
   }
@@ -562,8 +637,7 @@ public class TestTaskImpl {
         mockTask.canCommit(mockTask.getLastAttempt().getID()));
 
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
 
     assertTaskSucceededState();
   }
@@ -578,8 +652,7 @@ public class TestTaskImpl {
     TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     
     // Add a speculative task attempt that succeeds
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
     
@@ -606,8 +679,7 @@ public class TestTaskImpl {
         mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
 
     updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0).getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getAttemptList().get(0).getID()));
 
     assertTaskSucceededState();
   }
@@ -620,8 +692,7 @@ public class TestTaskImpl {
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
 
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
 
     // The task should now have succeeded
     assertTaskSucceededState();
@@ -644,8 +715,9 @@ public class TestTaskImpl {
     when(mockTezEvent.getSourceInfo()).thenReturn(meta);
     TaskAttemptEventOutputFailed outputFailedEvent = 
         new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
-        .getID(), TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
+    mockTask.handle(
+        createTaskTAFailedEvent(mockTask.getLastAttempt().getID(), TaskFailureType.NON_FATAL,
+            outputFailedEvent));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();
@@ -669,8 +741,7 @@ public class TestTaskImpl {
     launchTaskAttempt(mockTask.getLastAttempt().getID());
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
 
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(mockTask.getLastAttempt().getID()));
 
     // The task should now have succeeded
     assertTaskSucceededState();
@@ -679,8 +750,7 @@ public class TestTaskImpl {
 
     eventHandler.events.clear();
     // Now kill the attempt after it has succeeded
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
-        .getID(), TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();
@@ -721,8 +791,7 @@ public class TestTaskImpl {
     updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
 
     // Add a speculative task attempt
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
     MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
     launchTaskAttempt(specAttempt.getID());
     updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
@@ -730,15 +799,13 @@ public class TestTaskImpl {
 
     // Fail the first attempt
     updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
-    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
     assertEquals(TaskState.FAILED, mockTask.getState());
     assertEquals(2, mockTask.getAttemptList().size());
 
     // Now fail the speculative attempt
     updateAttemptState(specAttempt, TaskAttemptState.FAILED);
-    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(createTaskTAFailedEvent(specAttempt.getID()));
     assertEquals(TaskState.FAILED, mockTask.getState());
     assertEquals(2, mockTask.getAttemptList().size());
   }
@@ -757,8 +824,7 @@ public class TestTaskImpl {
     updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
 
     // Add a speculative task attempt
-    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
-        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
     MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
     launchTaskAttempt(specAttempt.getID());
     updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
@@ -766,15 +832,13 @@ public class TestTaskImpl {
 
     // Fail the first attempt
     updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
-    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
-        TaskEventType.T_ATTEMPT_FAILED));
+    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID()));
     assertEquals(TaskState.FAILED, mockTask.getState());
     assertEquals(2, mockTask.getAttemptList().size());
 
     // Now succeed the speculative attempt
     updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
-    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
     assertEquals(TaskState.FAILED, mockTask.getState());
     assertEquals(2, mockTask.getAttemptList().size());
   }
@@ -788,8 +852,7 @@ public class TestTaskImpl {
     updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
 
     // Add a speculative task attempt
-    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
-        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
     MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
     launchTaskAttempt(specAttempt.getID());
     updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
@@ -797,8 +860,7 @@ public class TestTaskImpl {
 
     // Have the first task succeed
     eventHandler.events.clear();
-    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID()));
 
     // The task should now have succeeded and sent kill to other attempt
     assertTaskSucceededState();
@@ -811,8 +873,7 @@ public class TestTaskImpl {
         ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
 
     // Emulate the spec attempt being killed
-    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
-        TaskEventType.T_ATTEMPT_KILLED));
+    mockTask.handle(createTaskTAKilledEvent(specAttempt.getID()));
     assertTaskSucceededState();
 
     // Now fail the attempt after it has succeeded
@@ -823,8 +884,7 @@ public class TestTaskImpl {
     TaskAttemptEventOutputFailed outputFailedEvent =
         new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
     eventHandler.events.clear();
-    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
-        TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
+    mockTask.handle(createTaskTAFailedEvent(firstAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 7a20a37..d2d9a07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -53,8 +53,10 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
+import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -147,7 +149,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
-import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -3472,6 +3473,7 @@ public class TestVertexImpl {
     ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        TaskFailureType.NON_FATAL,
         "diag", TaskAttemptTerminationCause.APPLICATION_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
@@ -3506,6 +3508,7 @@ public class TestVertexImpl {
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        TaskFailureType.NON_FATAL,
         "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
@@ -3541,6 +3544,7 @@ public class TestVertexImpl {
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+        TaskFailureType.NON_FATAL,
         "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
@@ -4643,7 +4647,7 @@ public class TestVertexImpl {
       TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
       TaskImpl task = (TaskImpl)v1.getTask(taskId);
       task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED));
-      task.handle(new TaskEventTAUpdate(taskAttemptId, TaskEventType.T_ATTEMPT_SUCCEEDED));
+      task.handle(new TaskEventTASucceeded(taskAttemptId));
       v1.handle(new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
       v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
       dispatcher.await();
@@ -6242,7 +6246,7 @@ public class TestVertexImpl {
 
     // at least one task attempt is succeed, otherwise input initialize events won't been handled.
     dispatcher.getEventHandler().handle(new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED));
-    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(ta0_t0_v1, TaskEventType.T_ATTEMPT_SUCCEEDED));
+    dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta0_t0_v1));
     dispatcher.getEventHandler()
         .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
     dispatcher.await();
@@ -6323,10 +6327,8 @@ public class TestVertexImpl {
 
     TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
 
-    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
-        TaskEventType.T_ATTEMPT_LAUNCHED));
-    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
-        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    dispatcher.getEventHandler().handle(new TaskEventTALaunched(TezTaskAttemptID.getInstance(t1, 0)));
+    dispatcher.getEventHandler().handle(new TaskEventTASucceeded(TezTaskAttemptID.getInstance(t1, 0)));
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 38d9935..5e7e906 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
 
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -60,25 +61,19 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.runtime.api.InputSpecUpdate;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -494,12 +489,13 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
+  @SuppressWarnings("deprecation")
   private void testTaskAttemptFinishedEvent() throws Exception {
     {
       TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
-          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, TaskFailureType.FATAL,
           null, null, null, null, null, 2048,
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024,
@@ -533,6 +529,8 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getNodeId());
       Assert.assertEquals(event.getNodeHttpAddress(),
           deserializedEvent.getNodeHttpAddress());
+      Assert.assertEquals(event.getTaskFailureType(),
+          deserializedEvent.getTaskFailureType());
       logEvents(event, deserializedEvent);
     }
     {
@@ -545,7 +543,7 @@ public class TestHistoryEventsProtoConversion {
       TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
-          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, TaskFailureType.NON_FATAL,
           TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events,
           null, 0, null, 0,
           ContainerId.newInstance(
@@ -575,6 +573,53 @@ public class TestHistoryEventsProtoConversion {
       Assert.assertEquals(events.size(), event.getDataEvents().size());
       Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp());
       Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId());
+      Assert.assertEquals(event.getTaskFailureType(), deserializedEvent.getTaskFailureType());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TezTaskAttemptID taId =
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0);
+      long timestamp = 1024L;
+      List<DataEventDependencyInfo> events = Lists.newArrayList();
+      events.add(new DataEventDependencyInfo(timestamp, taId));
+      events.add(new DataEventDependencyInfo(timestamp, taId));
+      TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.KILLED, null,
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events,
+          null, 0, null, 0,
+          ContainerId.newInstance(
+              ApplicationAttemptId.newInstance(
+                  ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+          "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
+      TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskAttemptID(),
+          deserializedEvent.getTaskAttemptID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(),
+          deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getCounters(),
+          deserializedEvent.getCounters());
+      Assert.assertEquals(event.getContainerId(),
+          deserializedEvent.getContainerId());
+      Assert.assertEquals(event.getNodeId(),
+          deserializedEvent.getNodeId());
+      Assert.assertEquals(event.getNodeHttpAddress(),
+          deserializedEvent.getNodeHttpAddress());
+      Assert.assertEquals(event.getTaskAttemptError(),
+          deserializedEvent.getTaskAttemptError());
+      Assert.assertEquals(events.size(), event.getDataEvents().size());
+      Assert
+          .assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp());
+      Assert.assertEquals(events.get(0).getTaskAttemptId(),
+          event.getDataEvents().get(0).getTaskAttemptId());
+      Assert.assertEquals(event.getTaskFailureType(), deserializedEvent.getTaskFailureType());
       logEvents(event, deserializedEvent);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ea683f7..5c596c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
+              random.nextInt(), TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
               null, null, null, null, 0, null, 0,
               containerId, nodeId, null, null, "nodeHttpAddress");
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
index 5ae416a..9fe3ee0 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -50,16 +51,23 @@ public class TestUmbilical implements TezUmbilical {
     }
   }
 
-  public List<TezEvent> getEvents() {
-    return this.events;
+  @Override
+  public void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType taskFailureType, Throwable t,
+                            String message, EventMetaData sourceInfo) {
+    LOG.info("Received failure from task: " + taskAttemptID
+        + ", Message: " + message
+        + ", taskFailureType=" + taskFailureType);
   }
 
   @Override
-  public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, 
-      String message, EventMetaData sourceInfo) {
-    LOG.info("Received fatal error from task: " + taskAttemptID
+  public void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, String message,
+                             EventMetaData sourceInfo) {
+    LOG.info("Received kill from task: " + taskAttemptID
         + ", Message: " + message);
+  }
 
+  public List<TezEvent> getEvents() {
+    return this.events;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 26d4d98..1e77ce8 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -448,6 +448,9 @@ public class HistoryEventTimelineConversion {
 
     atsEntity.addPrimaryFilter(ATSConstants.STATUS, event.getState().name());
 
+    if (event.getTaskFailureType() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name());
+    }
     atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime());
     atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index c5badaa..abfd757 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.counters.TezCounters;
@@ -83,6 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.codehaus.jettison.json.JSONException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -183,7 +183,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
+              random.nextInt(), TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.OUTPUT_LOST,
               null, null, null, null, 0, null, 0,
               containerId, nodeId, null, null, "nodeHttpAddress");
           break;
@@ -519,7 +519,7 @@ public class TestHistoryEventTimelineConversion {
     events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
-        startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime,
+        startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime,
         tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
@@ -543,7 +543,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, evt.getTimestamp());
 
     final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
-    Assert.assertEquals(16, otherInfo.size());
+    Assert.assertEquals(17, otherInfo.size());
     Assert.assertEquals(tezTaskAttemptID.toString(), 
         timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
     Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
@@ -552,6 +552,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
     Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
     Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
+    Assert.assertEquals(TaskFailureType.FATAL.name(), otherInfo.get(ATSConstants.TASK_FAILURE_TYPE));
     Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
     Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
     Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS);
@@ -565,6 +566,17 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID));
     Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID));
     Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS));
+
+    TaskAttemptFinishedEvent eventWithNullFailureType =
+        new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
+            startTime, finishTime, state, null, error, diagnostics, counters, events, null,
+            creationTime,
+            tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL",
+            "nodeHttpAddress");
+    TimelineEntity timelineEntityWithNullFailureType =
+        HistoryEventTimelineConversion.convertToTimelineEntity(eventWithNullFailureType);
+    Assert.assertNull(
+        timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index 86869e3..bd90c77 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -105,7 +105,7 @@
               <source>
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
-                  <include>Events.proto</include>
+                  <include>RuntimeEvents.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
index d4d7ca9..02dc69c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
@@ -24,12 +24,15 @@ import java.net.URISyntaxException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskFailureTypeProto;
 
 public class TezConverterUtils {
 
   /**
    * return a {@link URI} from a given url
-   * 
+   *
    * @param url
    *          url to convert
    * @return path from {@link URL}
@@ -59,7 +62,30 @@ public class TezConverterUtils {
     return new TezLocalResource(getURIFromYarnURL(lr.getResource()), lr.getSize(),
         lr.getTimestamp());
   }
-  
+
+  public static TaskFailureType failureTypeFromProto(TaskFailureTypeProto proto) {
+    switch (proto) {
+      case FT_NON_FATAL:
+        return TaskFailureType.NON_FATAL;
+      case FT_FATAL:
+        return TaskFailureType.FATAL;
+      default:
+        throw new TezUncheckedException("Unknown FailureTypeProto: " + proto);
+    }
+  }
+
+  public static TaskFailureTypeProto failureTypeToProto(TaskFailureType taskFailureType) {
+    switch (taskFailureType) {
+
+      case NON_FATAL:
+        return TaskFailureTypeProto.FT_NON_FATAL;
+      case FATAL:
+        return TaskFailureTypeProto.FT_FATAL;
+      default:
+        throw new TezUncheckedException("Unknown FailureType: " + taskFailureType);
+    }
+  }
+
   // @Private
   // public static void writeLocalResource(LocalResource lr, DataOutput out)
   // throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 07f92c2..a31136b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -46,6 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.slf4j.Logger;
@@ -386,6 +387,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     } finally {
       setTaskDone();
+      // Clear the interrupt status since the task execution is done.
+      Thread.interrupted();
       if (eventRouterThread != null) {
         eventRouterThread.interrupt();
         LOG.info("Joining on EventRouter");
@@ -719,13 +722,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     } catch (Throwable t) {
       LOG.warn("Failed to handle event", t);
-      setFatalError(t, "Failed to handle event");
+      registerError();
       EventMetaData sourceInfo = new EventMetaData(
           e.getDestinationInfo().getEventGenerator(),
           taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
           getTaskAttemptID());
       setFrameworkCounters();
-      tezUmbilical.signalFatalError(getTaskAttemptID(),
+      // Signal such errors as RETRIABLE. The user code has an option to report this as something
+      // other than retriable before we get control back.
+      // TODO: Don't catch Throwables.
+      tezUmbilical.signalFailure(getTaskAttemptID(), TaskFailureType.NON_FATAL,
           t, ExceptionUtils.getStackTrace(t), sourceInfo);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 59c8104..7b86d4b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -38,9 +38,7 @@ import com.google.common.collect.Maps;
 
 public abstract class RuntimeTask {
 
-  protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
-  protected AtomicReference<Throwable> fatalError = new AtomicReference<Throwable>();
-  protected String fatalErrorMessage = null;
+  protected final AtomicBoolean errorReported = new AtomicBoolean(false);
   protected float progress;
   protected final TezCounters tezCounters;
   private final Map<String, TezCounters> counterMap = Maps.newConcurrentMap();
@@ -99,10 +97,8 @@ public abstract class RuntimeTask {
     return taskSpec.getVertexName();
   }
 
-  public void setFatalError(Throwable t, String message) {
-    hasFatalError.set(true);
-    this.fatalError.set(t);
-    this.fatalErrorMessage = message;
+  public void registerError() {
+    errorReported.set(true);
   }
   
   public final void notifyProgressInvocation() {
@@ -113,13 +109,9 @@ public abstract class RuntimeTask {
     boolean retVal = progressNotified.getAndSet(false);
     return retVal;
   }
-  
-  public Throwable getFatalError() {
-    return this.fatalError.get();
-  }
 
-  public boolean hadFatalError() {
-    return hasFatalError.get();
+  public boolean wasErrorReported() {
+    return errorReported.get();
   }
 
   public synchronized void setProgress(float progress) {

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
index 935fdbb..5606663 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
@@ -18,18 +18,25 @@
 
 package org.apache.tez.runtime.api.events;
 
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.Event;
 
 public class TaskAttemptFailedEvent extends Event {
 
   private final String diagnostics;
+  private final TaskFailureType taskFailureType;
 
-  public TaskAttemptFailedEvent(String diagnostics) {
+  public TaskAttemptFailedEvent(String diagnostics, TaskFailureType taskFailureType) {
     this.diagnostics = diagnostics;
+    this.taskFailureType = taskFailureType;
   }
 
   public String getDiagnostics() {
     return diagnostics;
   }
 
+  public TaskFailureType getTaskFailureType() {
+    return taskFailureType;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java
new file mode 100644
index 0000000..3f5b326
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptKilledEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+public class TaskAttemptKilledEvent extends Event {
+  private final String diagnostics;
+
+  public TaskAttemptKilledEvent(String diagnostics) {
+    this.diagnostics = diagnostics;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index 6d4c902..cb247c9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.api.impl;
 public enum EventType {
   TASK_ATTEMPT_COMPLETED_EVENT,
   TASK_ATTEMPT_FAILED_EVENT,
+  TASK_ATTEMPT_KILLED_EVENT,
   DATA_MOVEMENT_EVENT,
   INPUT_READ_ERROR_EVENT,
   INPUT_FAILED_EVENT,

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 63e2b86..b3ce8c4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
@@ -41,10 +42,12 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto;
 
 public class TezEvent implements Writable {
 
@@ -79,6 +82,8 @@ public class TezEvent implements Writable {
       eventType = EventType.INPUT_READ_ERROR_EVENT;
     } else if (event instanceof TaskAttemptFailedEvent) {
       eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+    } else if (event instanceof TaskAttemptKilledEvent) {
+      eventType = EventType.TASK_ATTEMPT_KILLED_EVENT;
     } else if (event instanceof TaskAttemptCompletedEvent) {
       eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
     } else if (event instanceof InputFailedEvent) {
@@ -168,8 +173,14 @@ public class TezEvent implements Writable {
         TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
         eventBytes = TaskAttemptFailedEventProto.newBuilder()
             .setDiagnostics(tfEvt.getDiagnostics())
+            .setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
             .build().toByteArray();
         break;
+        case TASK_ATTEMPT_KILLED_EVENT:
+          TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
+          eventBytes = TaskAttemptKilledEventProto.newBuilder()
+              .setDiagnostics(tkEvent.getDiagnostics()).build().toByteArray();
+          break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
         eventBytes = TaskAttemptCompletedEventProto.newBuilder()
             .build().toByteArray();
@@ -236,7 +247,12 @@ public class TezEvent implements Writable {
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEventProto tfProto =
             TaskAttemptFailedEventProto.parseFrom(eventBytes);
-        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
+            TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
+        break;
+      case TASK_ATTEMPT_KILLED_EVENT:
+        TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(eventBytes);
+        event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
         break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
         event = new TaskAttemptCompletedEvent();
@@ -293,4 +309,12 @@ public class TezEvent implements Writable {
     }
   }
 
+  @Override
+  public String toString() {
+    return "TezEvent{" +
+        "eventType=" + eventType +
+        ", sourceInfo=" + sourceInfo +
+        ", destinationInfo=" + destinationInfo +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 4431150..afb78d9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputStatisticsReporter;
@@ -152,12 +153,24 @@ public class TezInputContextImpl extends TezTaskContextImpl
     return sourceVertexName;
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);
   }
 
   @Override
+  public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception,
+                            @Nullable String message) {
+    super.signalFailure(taskFailureType, exception, message, sourceInfo);
+  }
+
+  @Override
+  public void killSelf(@Nullable Throwable exception, @Nullable String message) {
+    super.signalKillSelf(exception, message, sourceInfo);
+  }
+
+  @Override
   public void inputIsReady() {
     if (inputReadyTracker != null) {
       inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 1e5b6a5..1bd78d3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -137,12 +138,24 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     return destinationVertexName;
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);
   }
 
   @Override
+  public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception,
+                            @Nullable String message) {
+    super.signalFailure(taskFailureType, exception, message, sourceInfo);
+  }
+
+  @Override
+  public void killSelf(@Nullable Throwable exception, @Nullable String message) {
+    super.signalKillSelf(exception, message, sourceInfo);
+  }
+
+  @Override
   public int getOutputIndex() {
     return outputIndex;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 0c3283d..607bbf1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -33,12 +32,12 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
@@ -97,12 +96,24 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
     notifyProgress();
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);
   }
 
   @Override
+  public void reportFailure(TaskFailureType taskFailureType, @Nullable Throwable exception,
+                            @Nullable String message) {
+    super.signalFailure(taskFailureType, exception, message, sourceInfo);
+  }
+
+  @Override
+  public void killSelf(@Nullable Throwable exception, @Nullable String message) {
+    super.signalKillSelf(exception, message, sourceInfo);
+  }
+
+  @Override
   public boolean canCommit() throws IOException {
     return tezUmbilical.canCommit(this.taskAttemptID);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index c12b334..35abd1e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -37,6 +37,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -214,9 +215,21 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
   }
   
   protected void signalFatalError(Throwable t, String message, EventMetaData sourceInfo) {
+    signalFailure(TaskFailureType.NON_FATAL, t, message, sourceInfo);
+  }
+
+  protected void signalFailure(TaskFailureType taskFailureType, Throwable t,
+                               String message, EventMetaData sourceInfo) {
+    Preconditions.checkNotNull(taskFailureType, "TaskFailureType must be specified");
+    runtimeTask.setFrameworkCounters();
+    runtimeTask.registerError();
+    tezUmbilical.signalFailure(taskAttemptID, taskFailureType, t, message, sourceInfo);
+  }
+
+  protected void signalKillSelf(Throwable t, String message, EventMetaData sourceInfo) {
     runtimeTask.setFrameworkCounters();
-    runtimeTask.setFatalError(t, message);
-    tezUmbilical.signalFatalError(taskAttemptID, t, message, sourceInfo);
+    runtimeTask.registerError();
+    tezUmbilical.signalKillSelf(taskAttemptID, t, message, sourceInfo);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
index b45a9b2..b606dea 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
@@ -21,15 +21,20 @@ package org.apache.tez.runtime.api.impl;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.TaskFailureType;
 
+@Private
 public interface TezUmbilical {
 
-  public void addEvents(Collection<TezEvent> events);
+  void addEvents(Collection<TezEvent> events);
 
-  public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message,
-      EventMetaData sourceInfo);
+  void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType taskFailureType, Throwable t, String message,
+                     EventMetaData sourceInfo);
 
-  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+  void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, String message, EventMetaData sourceInfo);
+
+  boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
index 9a5a3ab..b7d5fb5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.task.ErrorReporter;
@@ -32,9 +33,15 @@ public interface TaskReporterInterface {
 
   boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException;
 
-  boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException,
+  boolean taskFailed(TezTaskAttemptID taskAttemptId,
+                              TaskFailureType taskFailureType,
+                              Throwable cause,
+                              String diagnostics, EventMetaData srcMeta) throws IOException,
       TezException;
 
+  boolean taskKilled(TezTaskAttemptID taskAttemtpId, Throwable cause, String diagnostics,
+                     EventMetaData srcMeta) throws IOException, TezException;
+
   void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events);
 
   boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
index 8dc7a87..c2395e1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
@@ -17,7 +17,8 @@ package org.apache.tez.runtime.task;
 public enum EndReason {
   SUCCESS(false),
   CONTAINER_STOP_REQUESTED(false),
-  KILL_REQUESTED(true),
+  KILL_REQUESTED(true), // External kill request
+  TASK_KILL_REQUEST(false), // Kill request originating from the task itself (self-kill)
   COMMUNICATION_FAILURE(false),
   TASK_ERROR(false);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index e5370d4..d1c1471 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -39,8 +39,10 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.*;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
@@ -116,7 +118,6 @@ public class TaskReporter implements TaskReporterInterface {
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
     currentCallable.markComplete();
     currentCallable = null;
-    // KKK Make sure the callable completes before proceeding
   }
 
   @Override
@@ -184,7 +185,7 @@ public class TaskReporter implements TaskReporterInterface {
     @Override
     public Boolean call() throws Exception {
       // Heartbeat only for active tasks. Errors, etc will be reported directly.
-      while (!task.isTaskDone() && !task.hadFatalError()) {
+      while (!task.isTaskDone() && !task.wasErrorReported()) {
         ResponseWrapper response = heartbeat(null);
 
         if (response.shouldDie) {
@@ -209,7 +210,7 @@ public class TaskReporter implements TaskReporterInterface {
       int pendingEventCount = eventsToSend.size();
       if (pendingEventCount > 0) {
         // This is OK because the pending events will be sent via the succeeded/failed messages.
-        // TaskDone is set before taskSucceeded / taskFailed are sent out - which is what causes the
+        // TaskDone is set before taskSucceeded / taskTerminated are sent out - which is what causes the
         // thread to exit.
         LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
       }
@@ -235,7 +236,7 @@ public class TaskReporter implements TaskReporterInterface {
       List<TezEvent> events = new ArrayList<TezEvent>();
       eventsToSend.drainTo(events);
 
-      if (!task.isTaskDone() && !task.hadFatalError()) {
+      if (!task.isTaskDone() && !task.wasErrorReported()) {
         boolean sendCounters = false;
         /**
          * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
@@ -281,7 +282,7 @@ public class TaskReporter implements TaskReporterInterface {
       // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
       // are running using the same umbilical.
       int numEventsReceived = 0;
-      if (task.isTaskDone() || task.hadFatalError()) {
+      if (task.isTaskDone() || task.wasErrorReported()) {
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
           LOG.info("Current task already complete, Ignoring all events in"
               + " heartbeat response, eventCount=" + response.getEvents().size());
@@ -364,6 +365,8 @@ public class TaskReporter implements TaskReporterInterface {
     /**
      * Sends out final events for task failure.
      * @param taskAttemptID
+     * @param isKilled
+     * @param taskFailureType
      * @param t
      * @param diagnostics
      * @param srcMeta
@@ -373,8 +376,9 @@ public class TaskReporter implements TaskReporterInterface {
      * @throws TezException
      *           indicates an exception somewhere in the AM.
      */
-    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
-        EventMetaData srcMeta) throws IOException, TezException {
+    private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, TaskFailureType taskFailureType,
+                                   Throwable t, String diagnostics,
+                                   EventMetaData srcMeta) throws IOException, TezException {
       // Ensure only one final event is ever sent.
       if (!finalEventQueued.getAndSet(true)) {
         List<TezEvent> tezEvents = new ArrayList<TezEvent>();
@@ -383,13 +387,19 @@ public class TaskReporter implements TaskReporterInterface {
         } else {
           diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
         }
-        tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-            srcMeta == null ? updateEventMetadata : srcMeta));
+        if (isKilled) {
+          tezEvents.add(new TezEvent(new TaskAttemptKilledEvent(diagnostics),
+              srcMeta == null ? updateEventMetadata : srcMeta));
+        } else {
+          tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics,
+              taskFailureType),
+              srcMeta == null ? updateEventMetadata : srcMeta));
+        }
         try {
           tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata));
         } catch (Exception e) {
           // Counter may exceed limitation
-          LOG.warn("Error when get constructing TaskStatusUpdateEvent");
+          LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out");
         }
         return !heartbeat(tezEvents).shouldDie;
       } else {
@@ -432,9 +442,18 @@ public class TaskReporter implements TaskReporterInterface {
   }
 
   @Override
-  public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
-      EventMetaData srcMeta) throws IOException, TezException {
-    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+  public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID,
+                                                  TaskFailureType taskFailureType,
+                                                  Throwable t, String diagnostics,
+                                                  EventMetaData srcMeta) throws IOException,
+      TezException {
+    return currentCallable.taskTerminated(taskAttemptID, false, taskFailureType, t, diagnostics, srcMeta);
+  }
+
+  @Override
+  public boolean taskKilled(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+                            EventMetaData srcMeta) throws IOException, TezException {
+    return currentCallable.taskTerminated(taskAttemptID, true, null, t, diagnostics, srcMeta);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index ffbed8c..8e634fa 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -40,6 +40,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
   private final LogicalIOProcessorRuntimeTask task;
   private final UserGroupInformation ugi;
   private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+  private final AtomicBoolean interruptAttempted = new AtomicBoolean(false);
 
   private volatile Thread ownThread;
 
@@ -116,11 +117,17 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
   }
 
 
-  public void interruptTask() {
-    // Ensure the task is only interrupted once.
+  public void abortTask() {
     if (!stopRequested.getAndSet(true)) {
       task.abortTask();
-      if (ownThread != null) {
+    }
+  }
+
+  public void interruptTask() {
+    if (!interruptAttempted.getAndSet(true)) {
+      LogicalIOProcessorRuntimeTask localTask = task;
+      // Send an interrupt only if the task is not done.
+      if (ownThread != null && (localTask != null && !localTask.isTaskDone())) {
         ownThread.interrupt();
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
index 07b32ce..bf30615 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
@@ -14,21 +14,30 @@
 
 package org.apache.tez.runtime.task;
 
+import org.apache.tez.runtime.api.TaskFailureType;
+
 public class TaskRunner2Result {
   final EndReason endReason;
+  final TaskFailureType taskFailureType;
   final Throwable error;
   final boolean containerShutdownRequested;
 
-  public TaskRunner2Result(EndReason endReason, Throwable error, boolean containerShutdownRequested) {
+  public TaskRunner2Result(EndReason endReason, TaskFailureType taskFailureType,
+                           Throwable error, boolean containerShutdownRequested) {
     this.endReason = endReason;
     this.error = error;
     this.containerShutdownRequested = containerShutdownRequested;
+    this.taskFailureType = taskFailureType;
   }
 
   public EndReason getEndReason() {
     return endReason;
   }
 
+  public TaskFailureType getTaskFailureType() {
+    return taskFailureType;
+  }
+
   public Throwable getError() {
     return error;
   }
@@ -39,10 +48,15 @@ public class TaskRunner2Result {
 
   @Override
   public String toString() {
-    return "TaskRunner2Result{" +
-        "endReason=" + endReason +
-        ", error=" + error +
-        ", containerShutdownRequested=" + containerShutdownRequested +
-        '}';
+    StringBuilder sb = new StringBuilder();
+    sb.append("TaskRunner2Result{");
+    sb.append("endReason=").append(endReason);
+    sb.append(", containerShutdownRequested=").append(containerShutdownRequested);
+    if (endReason != EndReason.SUCCESS) {
+      sb.append(", failureType=").append(taskFailureType);
+      sb.append(", error=").append(error);
+    }
+    sb.append("}");
+    return sb.toString();
   }
 }


Mime
View raw message