tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [3/3] tez git commit: TEZ-2669. Propagation of errors from plugins to the AM for error reporting. Contributed by Hitesh Shah and Siddharth Seth.
Date Tue, 12 Jan 2016 23:20:06 GMT
TEZ-2669. Propagation of errors from plugins to the AM for error
reporting. Contributed by Hitesh Shah and Siddharth Seth.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d765431
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d765431
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d765431

Branch: refs/heads/master
Commit: 1d765431601fb8ab7cca248baa973684d828afaa
Parents: 0c08577
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jan 12 15:19:22 2016 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jan 12 15:19:22 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../serviceplugins/api/ContainerLauncher.java   |  12 +-
 .../api/ContainerLauncherContext.java           |   3 +-
 .../api/ServicePluginException.java             |  36 +++
 .../tez/serviceplugins/api/TaskScheduler.java   |  48 +++-
 tez-dag/src/main/java/org/apache/tez/Utils.java |  66 +++++
 .../apache/tez/dag/api/TaskCommunicator.java    |  39 ++-
 .../dag/app/ContainerLauncherContextImpl.java   |  18 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  68 +++--
 .../dag/app/TaskCommunicatorContextImpl.java    |   6 +-
 .../tez/dag/app/TaskCommunicatorManager.java    | 151 +++++++++--
 .../app/TaskCommunicatorManagerInterface.java   |   3 +-
 .../tez/dag/app/TaskCommunicatorWrapper.java    |  83 ++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   2 +-
 .../tez/dag/app/dag/StateChangeNotifier.java    |   7 +-
 .../app/dag/event/DAGAppMasterEventType.java    |   3 +
 .../DAGAppMasterEventUserServiceFatalError.java |  46 ++++
 .../app/dag/event/DAGEventInternalError.java    |  32 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  22 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   8 +-
 .../app/launcher/ContainerLauncherManager.java  |  49 +++-
 .../app/launcher/ContainerLauncherWrapper.java  |  40 +++
 .../app/launcher/LocalContainerLauncher.java    |   2 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    | 271 ++++++++++++++++---
 .../tez/dag/app/rm/TaskSchedulerWrapper.java    |  90 ++++++
 .../dag/app/rm/container/AMContainerImpl.java   |  30 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   2 +-
 .../tez/dag/app/PluginWrapperTestHelpers.java   | 149 ++++++++++
 .../dag/app/TestTaskCommunicatorManager.java    |  99 ++++++-
 .../dag/app/TestTaskCommunicatorManager1.java   |   6 +-
 .../dag/app/TestTaskCommunicatorWrapper.java    |  43 +++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   9 +-
 .../launcher/TestContainerLauncherManager.java  |  83 +++++-
 .../launcher/TestContainerLauncherWrapper.java  |  30 ++
 .../tez/dag/app/rm/TestContainerReuse.java      |   8 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  14 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    | 106 +++++++-
 .../dag/app/rm/TestTaskSchedulerWrapper.java    |  29 ++
 .../dag/app/rm/container/TestAMContainer.java   |  10 +-
 .../app/rm/container/TestAMContainerMap.java    |   3 +-
 .../org/apache/tez/examples/JoinValidate.java   |   4 +-
 ...zTestServiceContainerLauncherWithErrors.java |  37 +++
 ...stServiceTaskSchedulerServiceWithErrors.java |  93 +++++++
 ...ezTestServiceTaskCommunicatorWithErrors.java |  83 ++++++
 .../tez/examples/JoinValidateConfigured.java    |  10 +
 .../tez/tests/ExternalTezServiceTestHelper.java | 194 +++++++++++++
 .../tez/tests/TestExternalTezServices.java      | 125 +--------
 .../tests/TestExternalTezServicesErrors.java    | 235 ++++++++++++++++
 48 files changed, 2231 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cdc037..d39cbb8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-2669. Propagation of errors from plugins to the AM for error reporting.
   TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups.
   TEZ-2129. Task and Attempt views should contain links to the logs
   TEZ-3025. InputInitializer creation should use the dag ugi.

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 5a77b69..8792fd7 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -74,16 +74,22 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle {
   }
 
   /**
-   * A request to launch the specified container
+   * Get the {@link ContainerLauncherContext} associated with this instance of the container
+   * launcher, which is used to communicate with the rest of the system
    *
    * @param launchRequest the actual launch request
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void launchContainer(ContainerLaunchRequest launchRequest);
+  public abstract void launchContainer(ContainerLaunchRequest launchRequest) throws
+      ServicePluginException;
 
   /**
    * A request to stop a specific container
    *
    * @param stopRequest the actual stop request
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void stopContainer(ContainerStopRequest stopRequest);
+  public abstract void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index dcd9e80..70a3498 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -99,12 +99,13 @@ public interface ContainerLauncherContext {
   ApplicationAttemptId getApplicationAttemptId();
 
   /**
-   * Get meta info from the specified TaskCommunicator. This assumes that the launched has been
+   * Get meta info from the specified TaskCommunicator. This assumes that the launcher has been
    * setup
    * along with a compatible TaskCommunicator, and the launcher knows how to read this meta-info
    *
    * @param taskCommName the name of the task communicator
    * @return meta info for the requested task communicator
+   *
    */
   Object getTaskCommunicatorMetaInfo(String taskCommName);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
new file mode 100644
index 0000000..737329a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+/**
+ * Indicates an error from pluggable Tez Services.
+ */
+public class ServicePluginException extends Exception {
+
+  public ServicePluginException() {
+  }
+
+  public ServicePluginException(String message) {
+    super(message);
+  }
+
+  public ServicePluginException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ServicePluginException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index de76029..5875bd2 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -101,38 +101,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
    * Get the currently available resources from this source
    *
    * @return the resources available at the time of invocation
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract Resource getAvailableResources();
+  public abstract Resource getAvailableResources() throws ServicePluginException;
 
   /**
    * Get the total available resources from this source
    *
    * @return the total available resources from the source
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract Resource getTotalResources();
+  public abstract Resource getTotalResources() throws ServicePluginException;
 
   /**
    * Get the number of nodes available from the source
    *
    * @return the number of nodes
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract int getClusterNodeCount();
+  public abstract int getClusterNodeCount() throws ServicePluginException;
 
   /**
    * Indication to a source that a node has been blacklisted, and should not be used for subsequent
    * allocations.
    *
    * @param nodeId te nodeId to be blacklisted
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void blacklistNode(NodeId nodeId);
+  public abstract void blacklistNode(NodeId nodeId) throws ServicePluginException;
 
   /**
    * Indication to a source that a node has been un-blacklisted, and can be used from subsequent
    * allocations
    *
    * @param nodeId the nodeId to be unblacklisted
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void unblacklistNode(NodeId nodeId);
+  public abstract void unblacklistNode(NodeId nodeId) throws ServicePluginException;
 
   /**
    * A request to the source to allocate resources for a requesting task, with location information
@@ -150,10 +160,12 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
    * @param clientCookie       a cookie associated with this request. This should be returned back
    *                           via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
    *                           Container)} method when a task is assigned to a resource
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
   public abstract void allocateTask(Object task, Resource capability,
                                     String[] hosts, String[] racks, Priority priority,
-                                    Object containerSignature, Object clientCookie);
+                                    Object containerSignature, Object clientCookie) throws ServicePluginException;
 
   /**
    * A request to the source to allocate resources for a requesting task, based on a previously used
@@ -171,11 +183,13 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
    * @param clientCookie       a cookie associated with this request. This should be returned back
    *                           via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
    *                           Container)} method when a task is assigned to a resource
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
   public abstract void allocateTask(Object task, Resource capability,
                                     ContainerId containerId, Priority priority,
                                     Object containerSignature,
-                                    Object clientCookie);
+                                    Object clientCookie) throws ServicePluginException;
 
   /**
    * A request to deallocate a task. This is typically a result of a task completing - with success
@@ -190,38 +204,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
    * @param endReason     the reason for the task failure
    * @param diagnostics   additional diagnostics information which may be relevant
    * @return true if the task was associated with a container, false if the task was not associated
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    * with a container
    */
   public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
                                          TaskAttemptEndReason endReason,
-                                         @Nullable String diagnostics);
+                                         @Nullable String diagnostics) throws ServicePluginException;
 
   /**
    * A request to de-allocate a previously allocated container.
    *
    * @param containerId the containerId to de-allocate
    * @return the task which was previously associated with this container, null otherwise
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract Object deallocateContainer(ContainerId containerId);
+  public abstract Object deallocateContainer(ContainerId containerId) throws ServicePluginException;
 
   /**
    * Inform the scheduler that it should unregister. This is primarily valid for schedulers which
    * require registration (YARN a.t.m)
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void setShouldUnregister();
+  public abstract void setShouldUnregister() throws ServicePluginException;
 
   /**
    * Checks with the scheduler whether it has unregistered.
    *
    * @return true if the scheduler has unregistered. False otherwise.
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract boolean hasUnregistered();
+  public abstract boolean hasUnregistered() throws ServicePluginException;
 
   /**
    * Indicates to the scheduler that the currently running dag has completed.
    * This can be used to reset dag specific statistics, potentially release resources and prepare
    * for a new DAG.
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void dagComplete();
+  public abstract void dagComplete() throws ServicePluginException;
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/Utils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
new file mode 100644
index 0000000..959b536
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.app.AppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+/**
+ * Utility class within the tez-dag module
+ */
+public class Utils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+  public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) {
+    String name;
+    try {
+      name = appContext.getContainerLauncherName(launcherIndex);
+    } catch (Exception e) {
+      LOG.error("Unable to get launcher name for index: " + launcherIndex +
+          ", falling back to reporting the index");
+      return "[" + String.valueOf(launcherIndex) + "]";
+    }
+    return "[" + launcherIndex + ":" + name + "]";
+  }
+
+  public static String getTaskCommIdentifierString(int taskCommIndex, AppContext appContext) {
+    String name;
+    try {
+      name = appContext.getTaskCommunicatorName(taskCommIndex);
+    } catch (Exception e) {
+      LOG.error("Unable to get taskcomm name for index: " + taskCommIndex +
+          ", falling back to reporting the index");
+      return "[" + String.valueOf(taskCommIndex) + "]";
+    }
+    return "[" + taskCommIndex + ":" + name + "]";
+  }
+
+  public static String getTaskSchedulerIdentifierString(int schedulerIndex, AppContext appContext) {
+    String name;
+    try {
+      name = appContext.getTaskSchedulerName(schedulerIndex);
+    } catch (Exception e) {
+      LOG.error("Unable to get scheduler name for index: " + schedulerIndex +
+          ", falling back to reporting the index");
+      return "[" + String.valueOf(schedulerIndex) + "]";
+    }
+    return "[" + schedulerIndex + ":" + name + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 38742de..1b6ad07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.ServicePluginLifecycle;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -107,8 +108,11 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * @param containerId the associated containerId
    * @param hostname    the hostname on which the container runs
    * @param port        the port for the service which is running the container
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
+  public abstract void registerRunningContainer(ContainerId containerId, String hostname,
+                                                int port) throws ServicePluginException;
 
   /**
    * Register the end of a container. This can be caused by preemption, the container completing
@@ -117,9 +121,12 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * @param containerId the associated containerId
    * @param endReason   the end reason for the container completing
    * @param diagnostics diagnostics associated with the container end
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
   public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
-                                            @Nullable String diagnostics);
+                                            @Nullable String diagnostics) throws
+      ServicePluginException;
 
   /**
    * Register a task attempt to execute on a container
@@ -133,11 +140,14 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * @param credentialsChanged  whether the credentials are different from the original credentials
    *                            associated with this container
    * @param priority            the priority of the task being executed
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
   public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
                                                   Map<String, LocalResource> additionalResources,
                                                   Credentials credentials,
-                                                  boolean credentialsChanged, int priority);
+                                                  boolean credentialsChanged, int priority) throws
+      ServicePluginException;
 
   /**
    * Register the completion of a task. This may be a result of preemption, the container dying,
@@ -146,17 +156,22 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * @param taskAttemptID the task attempt which has completed / needs to be completed
    * @param endReason     the endReason for the task attempt.
    * @param diagnostics   diagnostics associated with the task end
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
                                                     TaskAttemptEndReason endReason,
-                                                    @Nullable String diagnostics);
+                                                    @Nullable String diagnostics) throws
+      ServicePluginException;
 
   /**
    * Return the address, if any, that the service listens on
    *
    * @return the address
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract InetSocketAddress getAddress();
+  public abstract InetSocketAddress getAddress() throws ServicePluginException;
 
   /**
    * Receive notifications on vertex state changes.
@@ -175,9 +190,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
    *                    Additional information may be available for specific events, Look at the
    *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
-   * @throws Exception
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
 
   /**
    * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
@@ -187,9 +203,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * the next dag being submitted.
    *
    * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
-   *
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract void dagComplete(int dagIdentifier);
+  public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
 
   /**
    * Share meta-information such as host:port information where the Task Communicator may be
@@ -197,6 +214,8 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    * Primarily for use by compatible launchers to learn this information.
    *
    * @return meta info for the task communicator
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
    */
-  public abstract Object getMetaInfo();
+  public abstract Object getMetaInfo() throws ServicePluginException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index a2e0dd6..9434256 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -19,6 +19,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
@@ -29,10 +31,13 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("unchecked")
 public class ContainerLauncherContextImpl implements ContainerLauncherContext {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class);
   private final AppContext context;
   private final TaskCommunicatorManagerInterface tal;
   private final UserPayload initialUserPayload;
@@ -101,7 +106,18 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
   @Override
   public Object getTaskCommunicatorMetaInfo(String taskCommName) {
     int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
-    return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+    try {
+      return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+    } catch (Exception e) {
+      String msg = "Error in retrieving meta-info from TaskCommunicator"
+          + ", communicatorName=" + context.getTaskCommunicatorName(taskCommId);
+      LOG.error(msg, e);
+      context.getEventHandler().handle(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
+    return null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c0b86a5..609a018 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -63,6 +63,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
@@ -72,6 +73,8 @@ import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -151,10 +154,6 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
-import org.apache.tez.dag.app.dag.impl.TaskImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
-import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.ContainerLauncherEventType;
 import org.apache.tez.dag.app.rm.TaskSchedulerManager;
@@ -671,8 +670,34 @@ public class DAGAppMaster extends AbstractService {
     return taskSchedulerManager;
   }
 
+  private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagEvent) {
+    state = DAGAppMasterState.ERROR;
+    if (currentDAG != null) {
+      _updateLoggers(currentDAG, "_post");
+      String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID();
+      LOG.info(errDiagnostics);
+      // Inform the current DAG about the error
+      sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent));
+    } else {
+      LOG.info(errDiagnosticsPrefix + ". AppMaster will exit as no dag is active");
+      // This could be problematic if the scheduler generated the error,
+      // since un-registration may not be possible.
+      // For now - try setting this flag, but call the shutdownHandler irrespective of
+      // how the flag is handled by user code.
+      try {
+        this.taskSchedulerManager.setShouldUnregisterFlag();
+      } catch (Exception e) {
+        // Ignore exception for now
+        LOG.error("Error when trying to set unregister flag for TaskScheduler", e);
+      } finally {
+        shutdownHandler.shutdown();
+      }
+    }
+  }
+
   @VisibleForTesting
   protected synchronized void handle(DAGAppMasterEvent event) {
+    String errDiagnostics;
     switch (event.getType()) {
     case SCHEDULING_SERVICE_ERROR:
       // Scheduling error - probably an issue with the communication with the RM
@@ -683,22 +708,30 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventSchedulingServiceError schedulingServiceErrorEvent =
           (DAGAppMasterEventSchedulingServiceError) event;
       state = DAGAppMasterState.ERROR;
-      LOG.info("Error in the TaskScheduler. Shutting down.",
-          schedulingServiceErrorEvent.getThrowable());
+      errDiagnostics = "Error in the TaskScheduler. Shutting down. ";
+      addDiagnostic(errDiagnostics
+          + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable()));
+      LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable());
       shutdownHandler.shutdown();
       break;
+    case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR:
+    case CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR:
+    case TASK_SCHEDULER_SERVICE_FATAL_ERROR:
+      // A fatal error from the pluggable services. The AM cannot continue operation, and should
+      // be shutdown. The AM should not be restarted for recovery.
+      DAGAppMasterEventUserServiceFatalError usfe = (DAGAppMasterEventUserServiceFatalError) event;
+      Throwable error = usfe.getError();
+      errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo()
+          + ", eventType=" + event.getType()
+          + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError());
+      LOG.error(errDiagnostics, error);
+      addDiagnostic(errDiagnostics);
+
+      handleInternalError("Service error: " + event.getType(), errDiagnostics);
+      break;
     case INTERNAL_ERROR:
-      state = DAGAppMasterState.ERROR;
-      if(currentDAG != null) {
-        _updateLoggers(currentDAG, "_post");
-        // notify dag to finish which will send the DAG_FINISHED event
-        LOG.info("Internal Error. Notifying dags to finish.");
-        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
-      } else {
-        LOG.info("Internal Error. Finishing directly as no dag is active.");
-        this.taskSchedulerManager.setShouldUnregisterFlag();
-        shutdownHandler.shutdown();
-      }
+      handleInternalError("DAGAppMaster Internal Error occurred",
+          "DAGAppMaster Internal Error occurred");
       break;
     case DAG_FINISHED:
       DAGAppMasterEventDAGFinished finishEvt =
@@ -756,6 +789,7 @@ public class DAGAppMaster extends AbstractService {
           LOG.error("Received a DAG Finished Event with state="
               + finishEvt.getDAGState()
               + ". Error. Shutting down.");
+          addDiagnostic("DAG completed with an ERROR state. Shutting down AM");
           state = DAGAppMasterState.ERROR;
           this.taskSchedulerManager.setShouldUnregisterFlag();
           shutdownHandler.shutdown();

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 6ae6dad..2b7234c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -205,11 +205,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
 
   @Override
   public void onStateUpdated(VertexStateUpdate event) {
-    try {
-      taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
-    } catch (Exception e) {
-      throw new TezUncheckedException(e);
-    }
+    taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
   }
 
   private DAG getDag() {

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 92bf3c4..64a964b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -30,9 +30,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -48,7 +53,6 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
@@ -81,7 +85,7 @@ public class TaskCommunicatorManager extends AbstractService implements
       .getLogger(TaskCommunicatorManager.class);
 
   private final AppContext context;
-  private final TaskCommunicator[] taskCommunicators;
+  private final TaskCommunicatorWrapper[] taskCommunicators;
   private final TaskCommunicatorContext[] taskCommunicatorContexts;
   protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
 
@@ -106,6 +110,24 @@ public class TaskCommunicatorManager extends AbstractService implements
   private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
 
 
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  /**
+   * Only for testing.
+   */
+  public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext,
+                                 TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+    super(TaskCommunicatorManager.class.getName());
+    this.context = appContext;
+    this.taskHeartbeatHandler = thh;
+    this.containerHeartbeatHandler = chh;
+    taskCommunicators =
+        new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)};
+    taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()};
+    taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+        new ServicePluginLifecycleAbstractService(taskCommunicator)};
+  }
+
   public TaskCommunicatorManager(AppContext context,
                                  TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
                                  List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
@@ -116,14 +138,15 @@ public class TaskCommunicatorManager extends AbstractService implements
     Preconditions.checkArgument(
         taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
         "TaskCommunicators must be specified");
-    this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+    this.taskCommunicators = new TaskCommunicatorWrapper[taskCommunicatorDescriptors.size()];
     this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
     this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
     for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
       UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
       taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
-      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
-      taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
+      taskCommunicators[i] = new TaskCommunicatorWrapper(createTaskCommunicator(taskCommunicatorDescriptors.get(i), i));
+      taskCommunicatorServiceWrappers[i] =
+          new ServicePluginLifecycleAbstractService(taskCommunicators[i].getTaskCommunicator());
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
   }
@@ -269,11 +292,11 @@ public class TaskCommunicatorManager extends AbstractService implements
       }
       if (taskAttemptEvent != null) {
         taskAttemptEvent.setReadErrorReported(readErrorReported);
-        context.getEventHandler().handle(taskAttemptEvent);
+        sendEvent(taskAttemptEvent);
       }
       // route taGeneratedEvents to TaskAttempt
       if (!taGeneratedEvents.isEmpty()) {
-        context.getEventHandler().handle(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
+        sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
       }
       // route events to TaskAttempt
       Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent");
@@ -300,14 +323,14 @@ public class TaskCommunicatorManager extends AbstractService implements
                 sourceMeta.getEventGenerator());
           }
           TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent();
-          context.getEventHandler().handle(
+          sendEvent(
                new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                    TaskAttemptEventType.TA_FAILED,
                   "Error: " + taskFailedEvent.getDiagnostics(),
                     errCause));
           break;
         case TASK_ATTEMPT_COMPLETED_EVENT:
-          context.getEventHandler().handle(
+          sendEvent(
               new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
           break;
         default:
@@ -317,7 +340,7 @@ public class TaskCommunicatorManager extends AbstractService implements
       }
       if (!eventsForVertex.isEmpty()) {
         TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
-        context.getEventHandler().handle(
+        sendEvent(
             new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
       }
       taskHeartbeatHandler.pinged(taskAttemptID);
@@ -339,8 +362,7 @@ public class TaskCommunicatorManager extends AbstractService implements
   }
 
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
-    context.getEventHandler()
-        .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+    sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
     pingContainerHeartbeatHandler(containerId);
   }
 
@@ -351,7 +373,7 @@ public class TaskCommunicatorManager extends AbstractService implements
     // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
     // instead of waiting for the unregister to flow through the Container.
     // Fix along the same lines as TEZ-2124 by introducing an explict context.
-    context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+    sendEvent(new TaskAttemptEventAttemptKilled(taskAttemptId,
         diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
         taskAttemptEndReason)));
   }
@@ -363,14 +385,25 @@ public class TaskCommunicatorManager extends AbstractService implements
     // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
     // instead of waiting for the unregister to flow through the Container.
     // Fix along the same lines as TEZ-2124 by introducing an explict context.
-    context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+    sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
         TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
         taskAttemptEndReason)));
   }
 
-  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
-      Exception {
-    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) {
+    try {
+      taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+    } catch (Exception e) {
+      String msg = "Error in TaskCommunicator when handling vertex state update notification"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+          + ", vertexName=" + event.getVertexName()
+          + ", vertexState=" + event.getVertexState();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
 
@@ -410,9 +443,19 @@ public class TaskCommunicatorManager extends AbstractService implements
 
     // Inform all communicators of the dagCompletion.
     for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
-      taskCommunicators[i].dagComplete(dag.getID().getId());
-      ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+      try {
+        ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteStart(dag);
+        taskCommunicators[i].dagComplete(dag.getID().getId());
+        ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteEnd();
+      } catch (Exception e) {
+        String msg = "Error in TaskCommunicator when notifying for DAG completion"
+            + ", communicator=" + Utils.getTaskCommIdentifierString(i, context);
+        LOG.error(msg, e);
+        sendEvent(
+            new DAGAppMasterEventUserServiceFatalError(
+                DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+                msg, e));
+      }
     }
 
   }
@@ -434,8 +477,20 @@ public class TaskCommunicatorManager extends AbstractService implements
           "Multiple registrations for containerId: " + containerId);
     }
     NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
-    taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
-        nodeId.getPort());
+    try {
+      taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+          nodeId.getPort());
+    } catch (Exception e) {
+      String msg = "Error in TaskCommunicator when registering running Container"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", containerId=" + containerId
+          + ", nodeId=" + nodeId;
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
   @Override
@@ -447,7 +502,18 @@ public class TaskCommunicatorManager extends AbstractService implements
     if (containerInfo.taskAttemptId != null) {
       registeredAttempts.remove(containerInfo.taskAttemptId);
     }
-    taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+    try {
+      taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+    } catch (Exception e) {
+      String msg = "Error in TaskCommunicator when unregistering Container"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", containerId=" + containerId;
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
   @Override
@@ -475,9 +541,21 @@ public class TaskCommunicatorManager extends AbstractService implements
           + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
           + " when already assigned to: " + containerIdFromMap);
     }
-    taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
-        amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
-        amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+    try {
+      taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+          amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
+          amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+    } catch (Exception e) {
+      String msg = "Error in TaskCommunicator when registering Task Attempt"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", containerId=" + containerId
+          + ", taskId=" + amContainerTask.getTask().getTaskAttemptID();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
   @Override
@@ -495,11 +573,23 @@ public class TaskCommunicatorManager extends AbstractService implements
     }
     // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
     registeredContainers.put(containerId, NULL_CONTAINER_INFO);
-    taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+    try {
+      taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+    } catch (Exception e) {
+      String msg = "Error in TaskCommunicator when unregistering Task Attempt"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", containerId=" + containerId
+          + ", taskId=" + attemptId;
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
   @Override
-  public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+  public TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex) {
     return taskCommunicators[taskCommIndex];
   }
 
@@ -516,4 +606,9 @@ public class TaskCommunicatorManager extends AbstractService implements
           + ", ContainerId not known for this attempt");
     }
   }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    context.getEventHandler().handle(event);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index 8d060a2..e07b1a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 /**
@@ -42,5 +41,5 @@ public interface TaskCommunicatorManagerInterface {
 
   void dagSubmitted();
 
-  TaskCommunicator getTaskCommunicator(int taskCommIndex);
+  TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
new file mode 100644
index 0000000..4f9780e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+
+public class TaskCommunicatorWrapper {
+
+  private final TaskCommunicator real;
+
+  public TaskCommunicatorWrapper(TaskCommunicator real) {
+    this.real = real;
+  }
+
+
+  public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws
+      Exception {
+    real.registerRunningContainer(containerId, hostname, port);
+  }
+
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+                                   @Nullable String diagnostics) throws Exception {
+    real.registerContainerEnd(containerId, endReason, diagnostics);
+
+  }
+
+  public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+                                         Map<String, LocalResource> additionalResources,
+                                         Credentials credentials, boolean credentialsChanged,
+                                         int priority) throws Exception {
+    real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
+  }
+
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+                                           TaskAttemptEndReason endReason,
+                                           @Nullable String diagnostics) throws Exception {
+    real.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics);
+  }
+
+  public InetSocketAddress getAddress() throws Exception {
+    return real.getAddress();
+  }
+
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    real.onVertexStateUpdated(stateUpdate);
+  }
+
+  public void dagComplete(int dagIdentifier) throws Exception {
+    real.dagComplete(dagIdentifier);
+  }
+
+  public Object getMetaInfo() throws Exception {
+    return real.getMetaInfo();
+  }
+
+  public TaskCommunicator getTaskCommunicator() {
+    return real;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 78e95bd..d071e0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -273,7 +273,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   }
 
   @Override
-  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
     // Empty. Not registering, or expecting any updates.
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 990bdea..bd04fd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -34,10 +34,12 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -110,7 +112,10 @@ public class StateChangeNotifier {
           } catch (Exception e) {
             // TODO send user code exception - TEZ-2332
             LOG.error("Error in state update notification for " + event, e);
-            dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+            dag.getEventHandler().handle(
+                new DAGEventInternalError(dag.getID(),
+                    "Internal Error in State Update Notification: "
+                        + ExceptionUtils.getStackTrace(e)));
             return;
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
index 5a102a5..9cf2414 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
@@ -22,6 +22,9 @@ public enum DAGAppMasterEventType {
   INTERNAL_ERROR,
   AM_REBOOT,
   DAG_FINISHED,
+  TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+  CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+  TASK_SCHEDULER_SERVICE_FATAL_ERROR,
   SCHEDULING_SERVICE_ERROR,
   NEW_DAG_SUBMITTED, // Indicates a new dag being submitted, to notify sub-components
   DAG_CLEANUP

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
new file mode 100644
index 0000000..7bc3bd8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.EnumSet;
+
+import com.google.common.base.Preconditions;
+
+public class DAGAppMasterEventUserServiceFatalError extends DAGAppMasterEvent implements DiagnosableEvent {
+
+  private final Throwable error;
+  private final String diagnostics;
+
+  public DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType type,
+                                                String diagnostics, Throwable t) {
+    super(type);
+    Preconditions.checkArgument(
+        EnumSet.of(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+            DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+            DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR).contains(type),
+        "Event created with incorrect type: " + type);
+    this.error = t;
+    this.diagnostics = diagnostics;
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
new file mode 100644
index 0000000..724ecbe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventInternalError extends DAGEvent implements DiagnosableEvent {
+
+  private final String diagnostics;
+
+  public DAGEventInternalError(TezDAGID dagId, String diagnostics) {
+    super(dagId, DAGEventType.INTERNAL_ERROR);
+    this.diagnostics = diagnostics;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 60f933f..41017ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -43,6 +43,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
@@ -2252,13 +2254,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private static class InternalErrorTransition implements
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
-      LOG.info(job.getID() + " terminating due to internal error");
+    public void transition(DAGImpl dag, DAGEvent event) {
+      String diagnostics = null;
+      if (event instanceof DiagnosableEvent) {
+        DiagnosableEvent errEvent = (DiagnosableEvent) event;
+        diagnostics = errEvent.getDiagnosticInfo();
+        dag.addDiagnostic(diagnostics);
+      }
+
+      LOG.info(dag.getID() + " terminating due to internal error. "
+          + (diagnostics == null? "" : " Error=" + diagnostics));
       // terminate all vertices
-      job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
-      job.setFinishTime();
-      job.cancelCommits();
-      job.finished(DAGState.ERROR);
+      dag.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
+      dag.setFinishTime();
+      dag.cancelCommits();
+      dag.finished(DAGState.ERROR);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 379e316..388d3c7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -56,8 +58,6 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.CallableEvent;
-import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
@@ -530,7 +530,9 @@ public class VertexManager {
       // state change must be triggered via an event transition
       LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e);
       appContext.getEventHandler().handle(
-          (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR)));
+          (new DAGEventInternalError(managedVertex.getVertexId().getDAGId(),
+              "Error in VertexManager for vertex: " + managedVertex.getLogIdentifier()
+              + ", error=" + ExceptionUtils.getStackTrace(e))));
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 9e56f44..98237c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -22,13 +22,17 @@ import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.Utils;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -49,7 +53,7 @@ public class ContainerLauncherManager extends AbstractService
   static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
 
   @VisibleForTesting
-  final ContainerLauncher containerLaunchers[];
+  final ContainerLauncherWrapper containerLaunchers[];
   @VisibleForTesting
   final ContainerLauncherContext containerLauncherContexts[];
   protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
@@ -59,7 +63,7 @@ public class ContainerLauncherManager extends AbstractService
   public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
     super(ContainerLauncherManager.class.getName());
     this.appContext = context;
-    containerLaunchers = new ContainerLauncher[] {containerLauncher};
+    containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)};
     containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
     containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
         new ServicePluginLifecycleAbstractService<>(containerLauncher)};
@@ -78,7 +82,7 @@ public class ContainerLauncherManager extends AbstractService
         containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
         "ContainerLauncherDescriptors must be specified");
     containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
-    containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+    containerLaunchers = new ContainerLauncherWrapper[containerLauncherDescriptors.size()];
     containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
 
 
@@ -87,9 +91,9 @@ public class ContainerLauncherManager extends AbstractService
       ContainerLauncherContext containerLauncherContext =
           new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
       containerLauncherContexts[i] = containerLauncherContext;
-      containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
-          containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode);
-      containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
+      containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context,
+          containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode));
+      containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher());
     }
   }
 
@@ -197,14 +201,43 @@ public class ContainerLauncherManager extends AbstractService
                 launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
                 launchEvent.getContainer(), schedulerName,
                 taskCommName);
-        containerLaunchers[launcherId].launchContainer(launchRequest);
+        try {
+          containerLaunchers[launcherId].launchContainer(launchRequest);
+        } catch (Exception e) {
+          String msg = "Error when launching container"
+              + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext)
+              + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+              + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext);
+          LOG.error(msg, e);
+          sendEvent(
+              new DAGAppMasterEventUserServiceFatalError(
+                  DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+                  msg, e));
+        }
         break;
       case CONTAINER_STOP_REQUEST:
         ContainerStopRequest stopRequest =
             new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
                 event.getContainerToken(), schedulerName, taskCommName);
-        containerLaunchers[launcherId].stopContainer(stopRequest);
+        try {
+          containerLaunchers[launcherId].stopContainer(stopRequest);
+        } catch (Exception e) {
+          String msg = "Error when stopping container"
+              + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext)
+              + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+              + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext);
+          LOG.error(msg, e);
+          sendEvent(
+              new DAGAppMasterEventUserServiceFatalError(
+                  DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+                  msg, e));
+        }
         break;
     }
   }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    appContext.getEventHandler().handle(event);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
new file mode 100644
index 0000000..08e287e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+
+public class ContainerLauncherWrapper {
+
+  private final ContainerLauncher real;
+
+  public ContainerLauncherWrapper(ContainerLauncher containerLauncher) {
+    this.real = containerLauncher;
+  }
+
+  public void launchContainer(ContainerLaunchRequest launchRequest) throws Exception {
+    real.launchContainer(launchRequest);
+  }
+
+  public void stopContainer(ContainerStopRequest stopRequest) throws Exception {
+    real.stopContainer(stopRequest);
+  }
+
+  public ContainerLauncher getContainerLauncher() {
+    return real;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index c4ab6e3..b737fda 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -228,7 +228,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
         tezChild =
             createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
-                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(),
+                ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(),
                 TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
       } catch (InterruptedException e) {
         handleLaunchFailed(e, event.getContainerId());


Mime
View raw message