tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [23/50] [abbrv] tez git commit: TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth)
Date Fri, 14 Aug 2015 20:58:41 GMT
TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2c19f3c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2c19f3c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2c19f3c3

Branch: refs/heads/TEZ-2003
Commit: 2c19f3c3e8caaffad35a8b04b9da1fb4b58e329a
Parents: 34633d9
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu May 28 02:01:04 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 14 13:46:44 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../org/apache/tez/common/TezUtilsInternal.java | 31 ++++++++-----
 .../apache/tez/dag/api/ContainerEndReason.java  | 27 +++++++++++
 .../tez/dag/api/TaskAttemptEndReason.java       | 13 +++---
 .../apache/tez/dag/api/TaskCommunicator.java    | 11 +++--
 .../apache/tez/dag/app/TaskAttemptListener.java |  6 ++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  9 ++--
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  6 ++-
 .../rm/container/AMContainerEventCompleted.java | 41 +++++++++++++++++
 .../dag/app/rm/container/AMContainerImpl.java   | 35 ++++++++-------
 .../app/TestTaskAttemptListenerImplTezDag.java  |  8 ++--
 .../dag/app/rm/container/TestAMContainer.java   | 47 +++++++++++---------
 .../TezTestServiceTaskCommunicatorImpl.java     |  9 ++--
 .../apache/tez/runtime/task/TezTaskRunner2.java | 20 +++++++--
 14 files changed, 186 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d651960..e333832 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -28,5 +28,6 @@ ALL CHANGES:
   TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
   TEZ-2465. Retrun the status of a kill request in TaskRunner2.
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
+  TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 347a4f6..0bdeb79 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -246,10 +246,16 @@ public class TezUtilsInternal {
         return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
       case SERVICE_BUSY:
         return TaskAttemptTerminationCause.SERVICE_BUSY;
-      case INTERRUPTED_BY_SYSTEM:
-        return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM;
-      case INTERRUPTED_BY_USER:
-        return TaskAttemptTerminationCause.INTERRUPTED_BY_USER;
+      case INTERNAL_PREEMPTION:
+        return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
+      case EXTERNAL_PREEMPTION:
+        return TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
+      case APPLICATION_ERROR:
+        return TaskAttemptTerminationCause.APPLICATION_ERROR;
+      case FRAMEWORK_ERROR:
+        return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+      case NODE_FAILED:
+        return TaskAttemptTerminationCause.NODE_FAILED;
       case OTHER:
         return TaskAttemptTerminationCause.UNKNOWN_ERROR;
       default:
@@ -267,20 +273,24 @@ public class TezUtilsInternal {
         return TaskAttemptEndReason.COMMUNICATION_ERROR;
       case SERVICE_BUSY:
         return TaskAttemptEndReason.SERVICE_BUSY;
+      case INTERNAL_PREEMPTION:
+        return TaskAttemptEndReason.INTERNAL_PREEMPTION;
+      case EXTERNAL_PREEMPTION:
+        return TaskAttemptEndReason.EXTERNAL_PREEMPTION;
+      case APPLICATION_ERROR:
+        return TaskAttemptEndReason.APPLICATION_ERROR;
+      case FRAMEWORK_ERROR:
+        return TaskAttemptEndReason.FRAMEWORK_ERROR;
+      case NODE_FAILED:
+        return TaskAttemptEndReason.NODE_FAILED;
       case INTERRUPTED_BY_SYSTEM:
-        return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM;
       case INTERRUPTED_BY_USER:
-        return TaskAttemptEndReason.INTERRUPTED_BY_USER;
       case UNKNOWN_ERROR:
       case TERMINATED_BY_CLIENT:
       case TERMINATED_AT_SHUTDOWN:
-      case INTERNAL_PREEMPTION:
-      case EXTERNAL_PREEMPTION:
       case TERMINATED_INEFFECTIVE_SPECULATION:
       case TERMINATED_EFFECTIVE_SPECULATION:
       case TERMINATED_ORPHANED:
-      case APPLICATION_ERROR:
-      case FRAMEWORK_ERROR:
       case INPUT_READ_ERROR:
       case OUTPUT_WRITE_ERROR:
       case OUTPUT_LOST:
@@ -288,7 +298,6 @@ public class TezUtilsInternal {
       case CONTAINER_LAUNCH_FAILED:
       case CONTAINER_EXITED:
       case CONTAINER_STOPPED:
-      case NODE_FAILED:
       case NODE_DISK_ERROR:
       default:
         return TaskAttemptEndReason.OTHER;

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
new file mode 100644
index 0000000..e13e886
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+// TODO TEZ-2003 Expose as a public API
+public enum ContainerEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  LAUNCH_FAILED, // Failure to launch the container
+  COMPLETED, // Completed via normal flow
+  OTHER
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
index 96a4768..de78d21 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
@@ -16,9 +16,12 @@ package org.apache.tez.dag.api;
 
 // TODO TEZ-2003 Expose as a public API
 public enum TaskAttemptEndReason {
-  COMMUNICATION_ERROR,
-  SERVICE_BUSY,
-  INTERRUPTED_BY_SYSTEM,
-  INTERRUPTED_BY_USER,
-  OTHER
+  NODE_FAILED, // Completed because the node running the container was marked as dead
+  COMMUNICATION_ERROR, // Communication error with the task
+  SERVICE_BUSY, // External service busy
+  INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+  EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  OTHER // Unknown reason
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 2651013..d0a006b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -36,7 +36,10 @@ public abstract class TaskCommunicator extends AbstractService {
   public abstract void registerRunningContainer(ContainerId containerId, String hostname,
int port);
 
   // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific
construct.
-  public abstract void registerContainerEnd(ContainerId containerId);
+  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+
+  // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end.
+  // Was it caused by preemption - or as a result of a general task completion / container
completion
 
   // TODO TEZ-2003 TaskSpec breakup into a clean interface
   // TODO TEZ-2003 Add support for priority
@@ -48,11 +51,7 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for
completeness.
 
   // TODO TEZ-2003 Remove reference to TaskAttemptID
-  // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
-  // e.g. preempted in which case the task may need to be informed. Alternately as a result
of
-  // a failed task.
-  // In case of preemption - a killTask API is likely a better bet than trying to overload
this method.
-  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
+  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason
endReason);
 
   // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the
AM.
   public abstract InetSocketAddress getAddress();

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index e4dad27..92e38ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.app;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -34,9 +36,9 @@ public interface TaskAttemptListener {
 
   void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int
taskCommId);
   
-  void unregisterRunningContainer(ContainerId containerId, int taskCommId);
+  void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason
endReason);
   
-  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
+  void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason
endReason);
 
   void dagComplete(DAG dag);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index db78fa9..1c61a0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -355,7 +356,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
+  public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason
endReason) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
     }
@@ -363,7 +364,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[taskCommId].registerContainerEnd(containerId);
+    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason);
   }
 
   @Override
@@ -404,7 +405,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason
endReason) {
     ContainerId containerId = registeredAttempts.remove(attemptId);
     if (containerId == null) {
       LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -418,7 +419,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing
element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index accde2c..3774eb4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -41,6 +41,8 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -185,7 +187,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId) {
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason)
{
     ContainerInfo containerInfo = registeredContainers.remove(containerId);
     if (containerInfo != null) {
       synchronized(containerInfo) {
@@ -231,7 +233,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason
endReason) {
     TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
     ContainerId containerId = attemptToContainerMap.remove(taskAttempt);
     if(containerId == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index 9bb6d7f..8ef2a83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 public class AMContainerEventCompleted extends AMContainerEvent {
@@ -61,4 +62,44 @@ public class AMContainerEventCompleted extends AMContainerEvent {
     return errCause;
   }
 
+  public ContainerEndReason getContainerEndReason() {
+    if (errCause != null) {
+      switch (errCause) {
+        case INTERNAL_PREEMPTION:
+          return ContainerEndReason.INTERNAL_PREEMPTION;
+        case EXTERNAL_PREEMPTION:
+          return ContainerEndReason.EXTERNAL_PREEMPTION;
+        case FRAMEWORK_ERROR:
+          return ContainerEndReason.FRAMEWORK_ERROR;
+        case APPLICATION_ERROR:
+          return ContainerEndReason.APPLICATION_ERROR;
+        case CONTAINER_LAUNCH_FAILED:
+          return ContainerEndReason.LAUNCH_FAILED;
+        case NODE_FAILED:
+          return ContainerEndReason.NODE_FAILED;
+        case CONTAINER_EXITED:
+          return ContainerEndReason.COMPLETED;
+        case UNKNOWN_ERROR:
+        case TERMINATED_BY_CLIENT:
+        case TERMINATED_AT_SHUTDOWN:
+        case TERMINATED_INEFFECTIVE_SPECULATION:
+        case TERMINATED_EFFECTIVE_SPECULATION:
+        case TERMINATED_ORPHANED:
+        case INPUT_READ_ERROR:
+        case OUTPUT_WRITE_ERROR:
+        case OUTPUT_LOST:
+        case TASK_HEARTBEAT_ERROR:
+        case CONTAINER_STOPPED:
+        case NODE_DISK_ERROR:
+        case COMMUNICATION_ERROR:
+        case SERVICE_BUSY:
+        case INTERRUPTED_BY_SYSTEM:
+        case INTERRUPTED_BY_USER:
+        default:
+          return ContainerEndReason.OTHER;
+      }
+    } else {
+      return ContainerEndReason.OTHER;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 8685556..7446734 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -27,6 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.Credentials;
@@ -635,7 +638,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
       }
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED);
       container.deAllocate();
     }
   }
@@ -665,7 +668,7 @@ public class AMContainerImpl implements AMContainer {
       }
       container.containerLocalResources = null;
       container.additionalLocalResources = null;
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(event.getContainerEndReason());
       String diag = event.getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
@@ -691,7 +694,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatingToTaskAttempt(container.currentAttempt,
             getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
       }
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.OTHER);
       container.logStopped(container.currentAttempt == null ?
           ContainerExitStatus.SUCCESS 
           : ContainerExitStatus.INVALID);
@@ -743,7 +746,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED);
       container.deAllocate();
     }
   }
@@ -760,7 +763,7 @@ public class AMContainerImpl implements AMContainer {
                 container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
       }
       container.logStopped(ContainerExitStatus.ABORTED);
-      container.unregisterFromTAListener();
+      container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
       container.sendStopRequestToNM();
     }
   }
@@ -832,7 +835,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
 
       AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
       container.handleExtraTAAssign(event, container.currentAttempt);
     }
   }
@@ -843,7 +846,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       container.lastTaskFinishTime = System.currentTimeMillis();
       container.completedAttempts.add(container.currentAttempt);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
       container.currentAttempt = null;
     }
   }
@@ -860,7 +863,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendTerminatedToTaskAttempt(container.currentAttempt,
             getMessage(container, event), event.getTerminationCause());
       }
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()));
       container.registerFailedAttempt(container.currentAttempt);
       container.currentAttempt= null;
       super.transition(container, cEvent);
@@ -870,7 +873,7 @@ public class AMContainerImpl implements AMContainer {
   protected static class StopRequestAtRunningTransition
       extends StopRequestAtIdleTransition {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
       super.transition(container, cEvent);
     }
   }
@@ -891,7 +894,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED);
     }
   }
 
@@ -900,7 +903,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
-      container.unregisterAttemptFromListener(container.currentAttempt);
+      container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
       container.sendTerminatingToTaskAttempt(container.currentAttempt,
           "Container " + container.getContainerId() +
               " hit an invalid transition - " + cEvent.getType() + " at " +
@@ -1026,7 +1029,7 @@ public class AMContainerImpl implements AMContainer {
     LOG.warn(errorMessage);
     this.logStopped(ContainerExitStatus.INVALID);
     this.sendStopRequestToNM();
-    this.unregisterFromTAListener();
+    this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
     this.unregisterFromContainerListener();
   }
 
@@ -1084,8 +1087,8 @@ public class AMContainerImpl implements AMContainer {
         container.getNodeId(), container.getContainerToken(), launcherId));
   }
 
-  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
-    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
+  protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason
endReason) {
+    taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason);
   }
 
   protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
@@ -1096,8 +1099,8 @@ public class AMContainerImpl implements AMContainer {
     taskAttemptListener.registerRunningContainer(containerId, taskCommId);
   }
 
-  protected void unregisterFromTAListener() {
-    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
+  protected void unregisterFromTAListener(ContainerEndReason endReason) {
+    this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason);
   }
 
   protected void registerWithContainerListener() {

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 34b9792..68d3baf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,6 +47,8 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
@@ -163,12 +165,12 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
     // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertTrue(containerTask.shouldDie());
 
@@ -182,7 +184,7 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
     AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
     taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
-    taskAttemptListener.unregisterRunningContainer(containerId3, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER);
     containerTask = tezUmbilical.getTask(containerContext3);
     assertTrue(containerTask.shouldDie());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 44dcd1f..322eabc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
@@ -135,14 +137,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -184,13 +186,13 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -235,7 +237,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
 
     TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taId2);
@@ -250,14 +252,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(taId2, 0);
+    verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -290,7 +292,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -327,7 +329,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -350,7 +352,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -388,7 +390,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -424,7 +426,7 @@ public class TestAMContainer {
 
     wc.containerTimedOut();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -458,7 +460,7 @@ public class TestAMContainer {
 
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -492,7 +494,7 @@ public class TestAMContainer {
     wc.launchFailed();
     wc.verifyState(AMContainerState.STOPPING);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -542,7 +544,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -572,7 +574,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -603,7 +605,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -634,7 +636,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -663,7 +665,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -698,7 +700,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -735,7 +737,8 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0,
+        ContainerEndReason.INTERNAL_PREEMPTION);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -772,7 +775,7 @@ public class TestAMContainer {
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index cf28b11..98673a6 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
@@ -98,8 +99,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId) {
-    super.registerContainerEnd(containerId);
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason)
{
+    super.registerContainerEnd(containerId, endReason);
   }
 
   @Override
@@ -175,8 +176,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
-    super.unregisterRunningTaskAttempt(taskAttemptID);
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason
endReason) {
+    super.unregisterRunningTaskAttempt(taskAttemptID, endReason);
     // Nothing else to do for now. The push API in the test does not support termination
of a running task
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2c19f3c3/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 3bf9f84..15629fd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -261,7 +261,13 @@ public class TezTaskRunner2 {
             taskRunnerCallable.interruptTask();
           }
           return true;
+        } else {
+          LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
+              task.getTaskAttemptID(), firstEndReason);
         }
+      } else {
+        LOG.info("Ignoring killTask request for {} since it is not in a running state",
+            task.getTaskAttemptID());
       }
     }
     return false;
@@ -389,10 +395,18 @@ public class TezTaskRunner2 {
         isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
         // Respect stopContainerRequested since it can come in at any point, despite a previous
failure.
         stopContainerRequested.set(true);
-      }
 
-      if (isFirstTerminate) {
-        killTask();
+        if (isFirstTerminate) {
+          LOG.info("Attempting to abort {} since a shutdown request was received",
+              task.getTaskAttemptID());
+          if (taskRunnerCallable != null) {
+            taskKillStartTime = System.currentTimeMillis();
+            taskRunnerCallable.interruptTask();
+          }
+        } else {
+          LOG.info("Not acting on shutdown request for {} since the task is not in running
state",
+              task.getTaskAttemptID());
+        }
       }
     }
   }


Mime
View raw message