tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/3] tez git commit: TEZ-2669. Propagation of errors from plugins to the AM for error reporting. Contributed by Hitesh Shah and Siddharth Seth.
Date Tue, 12 Jan 2016 23:20:05 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index f688b57..e4612b6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -32,9 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
@@ -112,7 +115,7 @@ public class TaskSchedulerManager extends AbstractService implements
       new AtomicBoolean(false);
   private final WebUIService webUI;
   private final NamedEntityDescriptor[] taskSchedulerDescriptors;
-  protected final TaskScheduler[]taskSchedulers;
+  protected final TaskSchedulerWrapper[] taskSchedulers;
   protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
 
   // Single executor service shared by all Schedulers for context callbacks
@@ -134,6 +137,29 @@ public class TaskSchedulerManager extends AbstractService implements
   // Not tracking container / task to schedulerId. Instead relying on everything flowing through
   // the system and being propagated back via events.
 
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  /**
+   * For Testing only
+   */
+  public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext,
+                              ContainerSignatureMatcher containerSignatureMatcher,
+                              DAGClientServer clientService, ExecutorService appCallbackExecutor) {
+    super(TaskSchedulerManager.class.getName());
+    this.appContext = appContext;
+    this.containerSignatureMatcher = containerSignatureMatcher;
+    this.clientService = clientService;
+    this.eventHandler = appContext.getEventHandler();
+    this.appCallbackExecutor = appCallbackExecutor;
+    this.taskSchedulers = new TaskSchedulerWrapper[]{new TaskSchedulerWrapper(taskScheduler)};
+    this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+        new ServicePluginLifecycleAbstractService<>(taskScheduler)};
+    this.taskSchedulerDescriptors = null;
+    this.webUI = null;
+    this.historyUrl = null;
+    this.isPureLocalMode = false;
+  }
+
   /**
    *
    * @param appContext
@@ -169,7 +195,7 @@ public class TaskSchedulerManager extends AbstractService implements
 
     this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
 
-    taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
+    taskSchedulers = new TaskSchedulerWrapper[this.taskSchedulerDescriptors.length];
     taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
   }
 
@@ -187,11 +213,33 @@ public class TaskSchedulerManager extends AbstractService implements
   }
   
   public Resource getAvailableResources(int schedulerId) {
-    return taskSchedulers[schedulerId].getAvailableResources();
+    try {
+      return taskSchedulers[schedulerId].getAvailableResources();
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler while getting available resources"
+          + ", schedule=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      throw new RuntimeException(e);
+    }
   }
 
   public Resource getTotalResources(int schedulerId) {
-    return taskSchedulers[schedulerId].getTotalResources();
+    try {
+      return taskSchedulers[schedulerId].getTotalResources();
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler while getting total resources"
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      throw new RuntimeException(e);
+    }
   }
 
   private ExecutorService createAppCallbackExecutorService() {
@@ -265,11 +313,27 @@ public class TaskSchedulerManager extends AbstractService implements
   }
 
   private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
-    if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-      taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
-    } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
-      taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
-    } else {
+    boolean invalidEventType = false;
+    try {
+      if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+        taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
+      } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+        taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
+      } else {
+        invalidEventType = true;
+      }
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling node blacklisting"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext);
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      return;
+    }
+    if (invalidEventType) {
       throw new TezUncheckedException("Invalid event type: " + event.getType());
     }
   }
@@ -280,7 +344,20 @@ public class TaskSchedulerManager extends AbstractService implements
     // TODO what happens to the task that was connected to this container?
     // current assumption is that it will eventually call handleTaStopRequest
     //TaskAttempt taskAttempt = (TaskAttempt)
-    taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
+    try {
+      taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling Container De-allocation"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+          + ", containerId=" + containerId;
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      return;
+    }
     // TODO does this container need to be stopped via C_STOP_REQUEST
     sendEvent(new AMContainerEventStopRequest(containerId));
   }
@@ -288,8 +365,22 @@ public class TaskSchedulerManager extends AbstractService implements
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
     TaskAttempt attempt = event.getAttempt();
     // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
-    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
-        .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
+    boolean wasContainerAllocated = false;
+    try {
+      wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
+          .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling Task De-allocation"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+          + ", taskAttemptId=" + attempt.getID();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      return;
+    }
     // use stored value of container id in case the scheduler has removed this
     // assignment because the task has been deallocated earlier.
     // retroactive case
@@ -333,8 +424,24 @@ public class TaskSchedulerManager extends AbstractService implements
           event.getAttemptID()));
     }
 
-    boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+    boolean wasContainerAllocated = false;
+
+    try {
+      wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
         true, null, event.getDiagnostics());
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling Task De-allocation"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+          + ", taskAttemptId=" + attempt.getID();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      return;
+    }
+
     if (!wasContainerAllocated) {
       LOG.error("De-allocated successful task: " + attempt.getID()
           + ", but TaskScheduler reported no container assigned to task");
@@ -359,12 +466,24 @@ public class TaskSchedulerManager extends AbstractService implements
         TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
         if (affinityAttempt != null) {
           Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
-          taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
-              event.getCapability(),
-              affinityAttempt.getAssignedContainerID(),
-              Priority.newInstance(event.getPriority()),
-              event.getContainerContext(),
-              event);
+          try {
+            taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+                event.getCapability(),
+                affinityAttempt.getAssignedContainerID(),
+                Priority.newInstance(event.getPriority()),
+                event.getContainerContext(),
+                event);
+          } catch (Exception e) {
+            String msg = "Error in TaskScheduler for handling Task Allocation"
+                + ", eventType=" + event.getType()
+                + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+                + ", taskAttemptId=" + taskAttempt.getID();
+            LOG.error(msg, e);
+            sendEvent(
+                new DAGAppMasterEventUserServiceFatalError(
+                    DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+                    msg, e));
+          }
           return;
         }
         LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
@@ -379,21 +498,33 @@ public class TaskSchedulerManager extends AbstractService implements
       }
     }
 
-    taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
-        event.getCapability(),
-        hosts,
-        racks,
-        Priority.newInstance(event.getPriority()),
-        event.getContainerContext(),
-        event);
+    try {
+      taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+          event.getCapability(),
+          hosts,
+          racks,
+          Priority.newInstance(event.getPriority()),
+          event.getContainerContext(),
+          event);
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling Task Allocation"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+          + ", taskAttemptId=" + taskAttempt.getID();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
   }
 
   @VisibleForTesting
   TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
-                                                   AppContext appContext,
-                                                   NamedEntityDescriptor taskSchedulerDescriptor,
-                                                   long customAppIdIdentifier,
-                                                   int schedulerId) throws TezException {
+                                    AppContext appContext,
+                                    NamedEntityDescriptor taskSchedulerDescriptor,
+                                    long customAppIdIdentifier,
+                                    int schedulerId) throws TezException {
     TaskSchedulerContext rawContext =
         new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
             customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
@@ -452,9 +583,10 @@ public class TaskSchedulerManager extends AbstractService implements
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
       }
-      taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
-      taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
+      taskSchedulers[i] = new TaskSchedulerWrapper(createTaskScheduler(host, port,
+          trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i));
+      taskSchedulerServiceWrappers[i] =
+          new ServicePluginLifecycleAbstractService<>(taskSchedulers[i].getTaskScheduler());
     }
   }
 
@@ -521,7 +653,13 @@ public class TaskSchedulerManager extends AbstractService implements
 
   public void initiateStop() {
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulers[i].initiateStop();
+      try {
+        taskSchedulers[i].getTaskScheduler().initiateStop();
+      } catch (Exception e) {
+        // Ignore for now as scheduler stop invoked on shutdown
+        LOG.error("Failed to do a clean initiateStop for Scheduler: "
+            + Utils.getTaskSchedulerIdentifierString(i, appContext), e);
+      }
     }
   }
 
@@ -686,7 +824,19 @@ public class TaskSchedulerManager extends AbstractService implements
     // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
 
     // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
-    int nodeCount = taskSchedulers[0].getClusterNodeCount();
+    int nodeCount = 0;
+    try {
+      nodeCount = taskSchedulers[0].getClusterNodeCount();
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler while getting node count"
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+      throw new RuntimeException(e);
+    }
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
       sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
@@ -701,7 +851,17 @@ public class TaskSchedulerManager extends AbstractService implements
 
   public void dagCompleted() {
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulers[i].dagComplete();
+      try {
+        taskSchedulers[i].dagComplete();
+      } catch (Exception e) {
+        String msg = "Error in TaskScheduler when notified for Dag Completion"
+            + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
+        LOG.error(msg, e);
+        sendEvent(
+            new DAGAppMasterEventUserServiceFatalError(
+                DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+                msg, e));
+      }
     }
   }
 
@@ -714,7 +874,18 @@ public class TaskSchedulerManager extends AbstractService implements
     // TODO Why is this making a call back into the scheduler, when the call is originating from there.
     // An AMContainer instance should already exist if an attempt is being made to preempt it
     AMContainer amContainer = appContext.getAllContainers().get(containerId);
-    taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+    try {
+      taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler when preempting container"
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext)
+          + ", containerId=" + containerId;
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
     // Inform the Containers about completion.
     sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
         "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -725,7 +896,17 @@ public class TaskSchedulerManager extends AbstractService implements
     this.shouldUnregisterFlag.set(true);
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
       if (this.taskSchedulers[i] != null) {
-        this.taskSchedulers[i].setShouldUnregister();
+        try {
+          this.taskSchedulers[i].setShouldUnregister();
+        } catch (Exception e) {
+          String msg = "Error in TaskScheduler when setting Unregister Flag"
+              + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
+          LOG.error(msg, e);
+          sendEvent(
+              new DAGAppMasterEventUserServiceFatalError(
+                  DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+                  msg, e));
+        }
       }
     }
   }
@@ -737,7 +918,19 @@ public class TaskSchedulerManager extends AbstractService implements
   public boolean hasUnregistered() {
     boolean result = true;
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      result = result & this.taskSchedulers[i].hasUnregistered();
+      // Explicitly not catching any exceptions around this API
+      // No clear route to recover. Better to crash.
+      try {
+        result = result & this.taskSchedulers[i].hasUnregistered();
+      } catch (Exception e) {
+        String msg = "Error in TaskScheduler when checking if a scheduler has unregistered"
+            + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
+        LOG.error(msg, e);
+        sendEvent(
+            new DAGAppMasterEventUserServiceFatalError(
+                DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+                msg, e));
+      }
       if (result == false) {
         return result;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
new file mode 100644
index 0000000..43cf045
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+
+public class TaskSchedulerWrapper {
+
+  private final TaskScheduler real;
+
+  public TaskSchedulerWrapper(TaskScheduler real) {
+    this.real = real;
+  }
+
+  public Resource getAvailableResources() throws Exception {
+    return real.getAvailableResources();
+  }
+
+  public Resource getTotalResources() throws Exception {
+    return real.getTotalResources();
+  }
+
+  public int getClusterNodeCount() throws Exception {
+    return real.getClusterNodeCount();
+  }
+
+  public void blacklistNode(NodeId nodeId) throws Exception {
+    real.blacklistNode(nodeId);
+  }
+
+  public void unblacklistNode(NodeId nodeId) throws Exception {
+    real.unblacklistNode(nodeId);
+  }
+
+  public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+                           Priority priority, Object containerSignature, Object clientCookie) throws
+      Exception {
+    real.allocateTask(task, capability, hosts, racks, priority, containerSignature, clientCookie);
+  }
+
+  public void allocateTask(Object task, Resource capability, ContainerId containerId,
+                           Priority priority, Object containerSignature, Object clientCookie) throws
+      Exception {
+    real.allocateTask(task, capability, containerId, priority, containerSignature, clientCookie);
+  }
+
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason,
+                                @Nullable String diagnostics) throws Exception {
+    return real.deallocateTask(task, taskSucceeded, endReason, diagnostics);
+  }
+
+  public Object deallocateContainer(ContainerId containerId) throws Exception {
+    return real.deallocateContainer(containerId);
+  }
+
+  public void setShouldUnregister() throws Exception {
+    real.setShouldUnregister();
+  }
+
+  public boolean hasUnregistered() throws Exception {
+    return real.hasUnregistered();
+  }
+
+  public void dagComplete() throws Exception {
+    real.dagComplete();
+  }
+
+  public TaskScheduler getTaskScheduler() {
+    return real;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index d37d106..e4302aa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.rm.container;
 
+import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -27,7 +28,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tez.Utils;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -461,6 +465,29 @@ public class AMContainerImpl implements AMContainer {
         dagId = container.appContext.getCurrentDAG().getID();
         dagLocalResources = container.appContext.getCurrentDAG().getLocalResources();
       }
+
+      // TODO TEZ-2625 This should ideally be handled inside of user code. Will change once
+      // CLC construction moves into user code. For now, generating a user code error here
+      InetSocketAddress cAddress = null;
+      try {
+        cAddress =
+            container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress();
+      } catch (Exception e) {
+        String msg = "Error in TaskCommunicator when getting address"
+            + ", communicator=" + Utils.getTaskCommIdentifierString(container.taskCommId, container.appContext)
+            + ", containerId=" + container.containerId;
+        LOG.error(msg, e);
+        container.sendEvent(
+            new DAGAppMasterEventUserServiceFatalError(
+                DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+                msg, e));
+        // We have not registered with any of the listeners etc yet. Send out a deallocateContainer
+        // message and return. The AM will shutdown shortly.
+        container.inError = true;
+        container.deAllocate();
+        return;
+      }
+
       ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
           dagId, dagLocalResources,
           container.appContext.getApplicationACLs(),
@@ -468,7 +495,8 @@ public class AMContainerImpl implements AMContainer {
           containerContext.getLocalResources(),
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
-          container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
+          cAddress,
+          containerContext.getCredentials(),
           container.appContext, container.container.getResource(),
           container.appContext.getAMConf());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b322e05..08f81fb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -193,7 +193,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     @Override
     public void start() throws Exception {
       taskCommunicatorManager = (TaskCommunicatorManager) getTaskCommunicatorManager();
-      taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0);
+      taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0).getTaskCommunicator();
       eventHandlingThread = new Thread(this);
       eventHandlingThread.start();
       ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
new file mode 100644
index 0000000..fb6faa1
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Set;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PluginWrapperTestHelpers {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PluginWrapperTestHelpers.class);
+
+  public static void testDelegation(Class<?> delegateClass, Class<?> rawClass,
+                                    Set<String> skipMethods) throws Exception {
+    TrackingAnswer answer = new TrackingAnswer();
+    Object mock = mock(rawClass, answer);
+    Constructor ctor = delegateClass.getConstructor(rawClass);
+    Object wrapper = ctor.newInstance(mock);
+
+    // Run through all the methods on the wrapper, and invoke the methods. Constructs
+    // arguments and return types for each of them.
+    Method[] methods = delegateClass.getMethods();
+    for (Method method : methods) {
+      if (method.getDeclaringClass().equals(delegateClass) &&
+          !skipMethods.contains(method.getName())) {
+
+        assertTrue(method.getExceptionTypes().length == 1);
+        assertEquals(Exception.class, method.getExceptionTypes()[0]);
+
+        LOG.info("Checking method [{}] with parameterTypes [{}]", method.getName(), Arrays.toString(method.getParameterTypes()));
+
+        Object[] params = constructMethodArgs(method);
+        Object result = method.invoke(wrapper, params);
+
+        // Validate the correct arguments are forwarded, and the real instance is invoked.
+        assertEquals(method.getName(), answer.lastMethodName);
+        assertArrayEquals(params, answer.lastArgs);
+
+        // Validate the results.
+        // Handle auto-boxing
+        if (answer.compareAsPrimitive) {
+          assertEquals(answer.lastRetValue, result);
+        } else {
+          assertTrue("Expected: " + System.identityHashCode(answer.lastRetValue) + ", actual=" +
+              System.identityHashCode(result), answer.lastRetValue == result);
+        }
+      }
+    }
+
+
+  }
+
+  public static Object[] constructMethodArgs(Method method) throws IllegalAccessException,
+      InstantiationException {
+    Class<?>[] paramTypes = method.getParameterTypes();
+    Object[] params = new Object[paramTypes.length];
+    for (int i = 0; i < paramTypes.length; i++) {
+      params[i] = constructSingleArg(paramTypes[i]);
+    }
+    return params;
+  }
+
+  private static Object constructSingleArg(Class<?> clazz) {
+    if (clazz.isPrimitive() || clazz.equals(String.class)) {
+      return getValueForPrimitiveOrString(clazz);
+    } else if (clazz.isEnum()) {
+      if (clazz.getEnumConstants().length == 0) {
+        return null;
+      } else {
+        return clazz.getEnumConstants()[0];
+      }
+    } else if (clazz.isArray() &&
+        (clazz.getComponentType().isPrimitive() || clazz.getComponentType().equals(String.class))) {
+      // Cannot mock. For now using null. Also does not handle deeply nested arrays.
+      return null;
+    } else {
+      return mock(clazz);
+    }
+  }
+
+  private static Object getValueForPrimitiveOrString(Class<?> clazz) {
+    if (clazz.equals(String.class)) {
+      return "teststring";
+    } else if (clazz.equals(byte.class)) {
+      return 'b';
+    } else if (clazz.equals(short.class)) {
+      return 2;
+    } else if (clazz.equals(int.class)) {
+      return 224;
+    } else if (clazz.equals(long.class)) {
+      return 445l;
+    } else if (clazz.equals(float.class)) {
+      return 2.24f;
+    } else if (clazz.equals(double.class)) {
+      return 4.57d;
+    } else if (clazz.equals(boolean.class)) {
+      return true;
+    } else if (clazz.equals(char.class)) {
+      return 'c';
+    } else if (clazz.equals(void.class)) {
+      return null;
+    } else {
+      throw new RuntimeException("Unrecognized type: " + clazz.getName());
+    }
+  }
+
+  public static class TrackingAnswer implements Answer {
+
+    public String lastMethodName;
+    public Object[] lastArgs;
+    public Object lastRetValue;
+    boolean compareAsPrimitive;
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      lastArgs = invocation.getArguments();
+      lastMethodName = invocation.getMethod().getName();
+      Class<?> retType = invocation.getMethod().getReturnType();
+      lastRetValue = constructSingleArg(retType);
+      compareAsPrimitive = retType.isPrimitive() || retType.isEnum() || retType.equals(String.class);
+
+      return lastRetValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index d1fd4f3..d76a5b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -21,12 +21,15 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
@@ -42,6 +45,8 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskCommunicator;
@@ -50,6 +55,9 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
@@ -57,6 +65,9 @@ import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestTaskCommunicatorManager {
 
@@ -215,8 +226,90 @@ public class TestTaskCommunicatorManager {
 
     } finally {
       tcm.stop();
-      verify(tcm.getTaskCommunicator(0)).shutdown();
-      verify(tcm.getTaskCommunicator(1)).shutdown();
+      verify(tcm.getTaskCommunicator(0).getTaskCommunicator()).shutdown();
+      verify(tcm.getTaskCommunicator(1).getTaskCommunicator()).shutdown();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testTaskCommunicatorUserError() {
+    TaskCommunicatorContextImpl taskCommContext = mock(TaskCommunicatorContextImpl.class);
+    TaskCommunicator taskCommunicator = mock(TaskCommunicator.class, new ExceptionAnswer());
+    doReturn(taskCommContext).when(taskCommunicator).getContext();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    when(appContext.getEventHandler()).thenReturn(eventHandler);
+    doReturn("testTaskCommunicator").when(appContext).getTaskCommunicatorName(0);
+    String expectedId = "[0:testTaskCommunicator]";
+
+    Configuration conf = new Configuration(false);
+
+    TaskCommunicatorManager taskCommunicatorManager =
+        new TaskCommunicatorManager(taskCommunicator, appContext, mock(TaskHeartbeatHandler.class),
+            mock(ContainerHeartbeatHandler.class));
+    try {
+      taskCommunicatorManager.init(conf);
+      taskCommunicatorManager.start();
+
+      // Invoking a couple of random methods.
+
+      DAG mockDag = mock(DAG.class, RETURNS_DEEP_STUBS);
+      when(mockDag.getID().getId()).thenReturn(1);
+
+      taskCommunicatorManager.dagComplete(mockDag);
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+
+      assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getError().getMessage().contains("TestException_" + "dagComplete"));
+      assertTrue(event.getDiagnosticInfo().contains("DAG completion"));
+      assertTrue(event.getDiagnosticInfo().contains(expectedId));
+
+
+      when(appContext.getAllContainers().get(any(ContainerId.class)).getContainer().getNodeId())
+          .thenReturn(mock(NodeId.class));
+
+      taskCommunicatorManager.registerRunningContainer(mock(ContainerId.class), 0);
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+
+      rawEvent = argumentCaptor.getAllValues().get(1);
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      event = (DAGAppMasterEventUserServiceFatalError) rawEvent;
+
+      assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(
+          event.getError().getMessage().contains("TestException_" + "registerRunningContainer"));
+      assertTrue(event.getDiagnosticInfo().contains("registering running Container"));
+      assertTrue(event.getDiagnosticInfo().contains(expectedId));
+
+
+    } finally {
+      taskCommunicatorManager.stop();
+    }
+
+  }
+
+  private static class ExceptionAnswer implements Answer {
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      Method method = invocation.getMethod();
+      if (method.getDeclaringClass().equals(TaskCommunicator.class) &&
+          !method.getName().equals("getContext") && !method.getName().equals("initialize") &&
+          !method.getName().equals("start") && !method.getName().equals("shutdown")) {
+        throw new RuntimeException("TestException_" + method.getName());
+      } else {
+        return invocation.callRealMethod();
+      }
     }
   }
 
@@ -353,7 +446,7 @@ public class TestTaskCommunicatorManager {
     }
 
     @Override
-    public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 3f80928..2921a22 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -169,7 +169,8 @@ public class TestTaskCommunicatorManager1 {
   @Test(timeout = 5000)
   public void testGetTask() throws IOException {
 
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+    TezTaskCommunicatorImpl taskCommunicator =
+        (TezTaskCommunicatorImpl) taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator();
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);
@@ -216,7 +217,8 @@ public class TestTaskCommunicatorManager1 {
 
   @Test(timeout = 5000)
   public void testGetTaskMultiplePulls() throws IOException {
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+    TezTaskCommunicatorImpl taskCommunicator =
+        (TezTaskCommunicatorImpl) taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator();
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
new file mode 100644
index 0000000..212bca4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import com.google.common.collect.Sets;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.junit.Test;
+
+public class TestTaskCommunicatorWrapper {
+
+  @Test(timeout = 5000)
+  public void testDelegation() throws Exception {
+    PluginWrapperTestHelpers.testDelegation(TaskCommunicatorWrapper.class, TaskCommunicator.class,
+        Sets.newHashSet("getTaskCommunicator"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6dd578f..4772492 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -108,6 +109,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -1469,7 +1471,7 @@ public class TestTaskAttempt {
 
   @SuppressWarnings("deprecation")
   @Test(timeout = 5000)
-  public void testKilledInNew() {
+  public void testKilledInNew() throws ServicePluginException {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
@@ -1609,11 +1611,12 @@ public class TestTaskAttempt {
         new Credentials(), new HashMap<String, String>(), "");
   }
 
-  private TaskCommunicatorManagerInterface createMockTaskAttemptListener() {
+  private TaskCommunicatorManagerInterface createMockTaskAttemptListener() throws
+      ServicePluginException {
     TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class);
     TaskCommunicator taskComm = mock(TaskCommunicator.class);
     doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    doReturn(new TaskCommunicatorWrapper(taskComm)).when(taListener).getTaskCommunicator(0);
     return taListener;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
index a8af808..1f75afb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -19,8 +19,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
@@ -35,20 +40,27 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +70,7 @@ public class TestContainerLauncherManager {
 
   @Before
   @After
-  public void reset() {
+  public void resetTest() {
     ContainerLaucherRouterForMultipleLauncherTest.reset();
   }
 
@@ -230,6 +242,73 @@ public class TestContainerLauncherManager {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testContainerLauncherUserError() throws ServicePluginException {
+
+    ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class);
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn("testlauncher").when(appContext).getContainerLauncherName(0);
+
+    Configuration conf = new Configuration(false);
+
+    ContainerLauncherManager containerLauncherManager =
+        new ContainerLauncherManager(containerLauncher, appContext);
+    try {
+      containerLauncherManager.init(conf);
+      containerLauncherManager.start();
+
+      // launch container
+      doThrow(new RuntimeException("testexception")).when(containerLauncher)
+          .launchContainer(any(ContainerLaunchRequest.class));
+      ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
+      Container container1 = mock(Container.class);
+      ContainerLauncherLaunchRequestEvent launchRequestEvent =
+          new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0);
+
+
+      containerLauncherManager.handle(launchRequestEvent);
+
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+      assertEquals(DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getError().getMessage().contains("testexception"));
+      assertTrue(event.getDiagnosticInfo().contains("launching container"));
+      assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]"));
+
+      reset(eventHandler);
+      // stop container
+
+      doThrow(new RuntimeException("teststopexception")).when(containerLauncher)
+          .stopContainer(any(ContainerStopRequest.class));
+      ContainerId containerId2 = mock(ContainerId.class);
+      NodeId nodeId2 = mock(NodeId.class);
+      ContainerLauncherStopRequestEvent stopRequestEvent =
+          new ContainerLauncherStopRequestEvent(containerId2, nodeId2, null, 0, 0, 0);
+
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+      containerLauncherManager.handle(stopRequestEvent);
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+      rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      event = (DAGAppMasterEventUserServiceFatalError) rawEvent;
+      assertTrue(event.getError().getMessage().contains("teststopexception"));
+      assertTrue(event.getDiagnosticInfo().contains("stopping container"));
+      assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]"));
+    } finally {
+      containerLauncherManager.stop();
+    }
+  }
+
   private static class ContainerLaucherRouterForMultipleLauncherTest
       extends ContainerLauncherManager {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java
new file mode 100644
index 0000000..d786bf9
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.collect.Sets;
+import org.apache.tez.dag.app.PluginWrapperTestHelpers;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.junit.Test;
+
+public class TestContainerLauncherWrapper {
+
+  @Test(timeout = 5000)
+  public void testDelegation() throws Exception {
+    PluginWrapperTestHelpers.testDelegation(ContainerLauncherWrapper.class, ContainerLauncher.class,
+        Sets.newHashSet("getContainerLauncher"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 0e90681..78dc8fd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1407,8 +1407,12 @@ public class TestContainerReuse {
   private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded,
                                     TaskAttemptEndReason endReason, String diagContains) {
     ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
-    verify(taskScheduler)
-        .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture());
+    try {
+      verify(taskScheduler)
+          .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
     assertEquals(1, argumentCaptor.getAllValues().size());
     if (diagContains == null) {
       assertNull(argumentCaptor.getValue());

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 8b489ea..b54d024 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -157,13 +157,15 @@ class TestTaskSchedulerHelpers {
           new TaskSchedulerContextImplWrapper(taskSchedulerContext,
               new CountingExecutorService(appCallbackExecutor));
       TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper);
-      taskSchedulers[0] =
-          new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync);
-      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
+
+      taskSchedulers[0] = new TaskSchedulerWrapper(
+          new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync));
+      taskSchedulerServiceWrappers[0] =
+          new ServicePluginLifecycleAbstractService(taskSchedulers[0].getTaskScheduler());
     }
 
     public TaskScheduler getSpyTaskScheduler() {
-      return taskSchedulers[0];
+      return taskSchedulers[0].getTaskScheduler();
     }
 
     @Override
@@ -172,7 +174,9 @@ class TestTaskSchedulerHelpers {
       // Init the service so that reuse configuration is picked up.
       ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
       ((AbstractService)taskSchedulerServiceWrappers[0]).start();
-      taskSchedulers[0] = spy(taskSchedulers[0]);
+      // For some reason, the spy needs to be setup after sertvice startup.
+      taskSchedulers[0] = new TaskSchedulerWrapper(spy(taskSchedulers[0].getTaskScheduler()));
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 8e4e4f0..c649870 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -41,6 +42,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -68,8 +71,13 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.TaskCommunicatorManager;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
 import org.apache.tez.dag.app.dag.impl.TaskImpl;
 import org.apache.tez.dag.app.dag.impl.VertexImpl;
@@ -86,6 +94,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -94,6 +103,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @SuppressWarnings("rawtypes")
 public class TestTaskSchedulerManager {
@@ -121,8 +133,9 @@ public class TestTaskSchedulerManager {
     @Override
     protected void instantiateSchedulers(String host, int port, String trackingUrl,
                                          AppContext appContext) {
-      taskSchedulers[0] = mockTaskScheduler;
-      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
+      taskSchedulers[0] = new TaskSchedulerWrapper(mockTaskScheduler);
+      taskSchedulerServiceWrappers[0] =
+          new ServicePluginLifecycleAbstractService<>(taskSchedulers[0].getTaskScheduler());
     }
     
     @Override
@@ -272,7 +285,7 @@ public class TestTaskSchedulerManager {
   }
   
   @Test (timeout = 5000)
-  public void testContainerInternalPreempted() throws IOException {
+  public void testContainerInternalPreempted() throws IOException, ServicePluginException {
     Configuration conf = new Configuration(false);
     schedulerHandler.init(conf);
     schedulerHandler.start();
@@ -533,6 +546,93 @@ public class TestTaskSchedulerManager {
         eq(launchRequest2));
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testTaskSchedulerUserError() {
+    TaskScheduler taskScheduler = mock(TaskScheduler.class, new ExceptionAnswer());
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    when(appContext.getEventHandler()).thenReturn(eventHandler);
+    doReturn("testTaskScheduler").when(appContext).getTaskSchedulerName(0);
+    String expectedId = "[0:testTaskScheduler]";
+
+    Configuration conf = new Configuration(false);
+
+    InetSocketAddress address = new InetSocketAddress(15222);
+    DAGClientServer mockClientService = mock(DAGClientServer.class);
+    doReturn(address).when(mockClientService).getBindAddress();
+    TaskSchedulerManager taskSchedulerManager =
+        new TaskSchedulerManager(taskScheduler, appContext, mock(ContainerSignatureMatcher.class),
+            mockClientService,
+            Executors.newFixedThreadPool(1)) {
+          @Override
+          protected void instantiateSchedulers(String host, int port, String trackingUrl,
+                                               AppContext appContext) throws TezException {
+            // Stubbed out since these are setup up front in the constructor used for testing
+          }
+        };
+
+    try {
+      taskSchedulerManager.init(conf);
+      taskSchedulerManager.start();
+
+      // Invoking a couple of random methods
+
+      AMSchedulerEventTALaunchRequest launchRequest =
+          new AMSchedulerEventTALaunchRequest(mock(TezTaskAttemptID.class), mock(Resource.class),
+              mock(TaskSpec.class), mock(TaskAttempt.class), mock(TaskLocationHint.class), 0,
+              mock(ContainerContext.class), 0, 0, 0);
+      taskSchedulerManager.handleEvent(launchRequest);
+
+      ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+      verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+      Event rawEvent = argumentCaptor.getValue();
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      DAGAppMasterEventUserServiceFatalError event =
+          (DAGAppMasterEventUserServiceFatalError) rawEvent;
+
+      assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getError().getMessage().contains("TestException_" + "allocateTask"));
+      assertTrue(event.getDiagnosticInfo().contains("Task Allocation"));
+      assertTrue(event.getDiagnosticInfo().contains(expectedId));
+
+
+      taskSchedulerManager.dagCompleted();
+      argumentCaptor = ArgumentCaptor.forClass(Event.class);
+      verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+
+      rawEvent = argumentCaptor.getAllValues().get(1);
+      assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+      event = (DAGAppMasterEventUserServiceFatalError) rawEvent;
+
+      assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType());
+      assertTrue(event.getError().getMessage().contains("TestException_" + "dagComplete"));
+      assertTrue(event.getDiagnosticInfo().contains("Dag Completion"));
+      assertTrue(event.getDiagnosticInfo().contains(expectedId));
+
+    } finally {
+      taskSchedulerManager.stop();
+    }
+  }
+
+  private static class ExceptionAnswer implements Answer {
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      Method method = invocation.getMethod();
+      if (method.getDeclaringClass().equals(TaskScheduler.class) &&
+          !method.getName().equals("getContext") && !method.getName().equals("initialize") &&
+          !method.getName().equals("start") && !method.getName().equals("shutdown")) {
+        throw new RuntimeException("TestException_" + method.getName());
+      } else {
+        return invocation.callRealMethod();
+      }
+    }
+  }
+
   public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager {
 
     private final TaskScheduler yarnTaskScheduler;

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java
new file mode 100644
index 0000000..cd8a496
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import com.google.common.collect.Sets;
+import org.apache.tez.dag.app.PluginWrapperTestHelpers;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.junit.Test;
+
+public class TestTaskSchedulerWrapper {
+
+  @Test(timeout = 5000)
+  public void testDelegation() throws Exception {
+    PluginWrapperTestHelpers.testDelegation(TaskSchedulerWrapper.class, TaskScheduler.class,
+        Sets.newHashSet("getTaskScheduler"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index cc88f0d..8b8b6d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,7 +63,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
@@ -1228,8 +1230,12 @@ public class TestAMContainer {
 
       tal = mock(TaskCommunicatorManagerInterface.class);
       TaskCommunicator taskComm = mock(TaskCommunicator.class);
-      doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-      doReturn(taskComm).when(tal).getTaskCommunicator(0);
+      try {
+        doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+      } catch (ServicePluginException e) {
+        throw new RuntimeException(e);
+      }
+      doReturn(new TaskCommunicatorWrapper(taskComm)).when(tal).getTaskCommunicator(0);
 
       dagID = TezDAGID.getInstance(applicationID, 1);
       vertexID = TezVertexID.getInstance(dagID, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index 31d756c..e21dda1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
 
 public class TestAMContainerMap {
 
@@ -42,7 +43,7 @@ public class TestAMContainerMap {
     return mock(ContainerHeartbeatHandler.class);
   }
 
-  private TaskCommunicatorManagerInterface mockTaskAttemptListener() {
+  private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException {
     TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
     TaskCommunicator taskComm = mock(TaskCommunicator.class);
     doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 186bacd..f9358bf 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -21,6 +21,7 @@ package org.apache.tez.examples;
 import java.io.IOException;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,7 +135,8 @@ public class JoinValidate extends TezExampleBase {
     return 0;
   }
 
-  private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
+  @VisibleForTesting
+  DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
       throws IOException {
     DAG dag = DAG.create(getDagName());
     if (getDefaultExecutionContext() != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
new file mode 100644
index 0000000..d489cca
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+
+public class TezTestServiceContainerLauncherWithErrors extends ContainerLauncher {
+  public TezTestServiceContainerLauncherWithErrors(
+      ContainerLauncherContext containerLauncherContext) {
+    super(containerLauncherContext);
+  }
+
+  @Override
+  public void launchContainer(ContainerLaunchRequest launchRequest) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void stopContainer(ContainerStopRequest stopRequest) {
+    throw new RuntimeException("Simulated Error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
new file mode 100644
index 0000000..1705eac
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+
+public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler {
+  public TezTestServiceTaskSchedulerServiceWithErrors(
+      TaskSchedulerContext taskSchedulerContext) {
+    super(taskSchedulerContext);
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    return Resource.newInstance(2048, 2);
+  }
+
+  @Override
+  public Resource getTotalResources() {
+    return Resource.newInstance(2048, 2);
+  }
+
+  @Override
+  public int getClusterNodeCount() {
+    return 1;
+  }
+
+  @Override
+  public void blacklistNode(NodeId nodeId) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void unblacklistNode(NodeId nodeId) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void allocateTask(Object task, Resource capability, ContainerId containerId,
+                           Priority priority, Object containerSignature, Object clientCookie) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason,
+                                @Nullable String diagnostics) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public Object deallocateContainer(ContainerId containerId) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void setShouldUnregister() {
+  }
+
+  @Override
+  public boolean hasUnregistered() {
+    return false;
+  }
+
+  @Override
+  public void dagComplete() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
new file mode 100644
index 0000000..0a3d8d4
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskcomm;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+
+public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
+  public TezTestServiceTaskCommunicatorWithErrors(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+  }
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+                                   @Nullable String diagnostics) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+                                         Map<String, LocalResource> additionalResources,
+                                         Credentials credentials, boolean credentialsChanged,
+                                         int priority) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+                                           TaskAttemptEndReason endReason,
+                                           @Nullable String diagnostics) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public InetSocketAddress getAddress() {
+    return NetUtils.createSocketAddrForHost("localhost", 0);
+  }
+
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    throw new RuntimeException("Simulated Error");
+  }
+
+  @Override
+  public void dagComplete(int dagIdentifier) {
+  }
+
+  @Override
+  public Object getMetaInfo() {
+    throw new RuntimeException("Simulated Error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
index f31476f..64b9063 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -15,6 +15,11 @@
 package org.apache.tez.examples;
 
 
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 
 public class JoinValidateConfigured extends JoinValidate {
@@ -60,4 +65,9 @@ public class JoinValidateConfigured extends JoinValidate {
   protected String getDagName() {
     return "JoinValidate_" + dagNameSuffix;
   }
+
+  public DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
+      throws IOException {
+    return super.createDag(tezConf, lhs, rhs, numPartitions);
+  }
 }


Mime
View raw message