tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject git commit: TEZ-1689. Exception handling for EdgeManagerPlugin. (zjffdu)
Date Wed, 29 Oct 2014 01:55:53 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 5c2ad29de -> 0e2672dea


TEZ-1689. Exception handling for EdgeManagerPlugin. (zjffdu)

(cherry picked from commit f9588210a27bf1d142c8f1da8cd3d22b47d97e3b)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e2672de
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e2672de
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e2672de

Branch: refs/heads/branch-0.5
Commit: 0e2672dea4c4c7b9cbfa67f066faf790b0b68ff5
Parents: 5c2ad29
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Oct 29 09:53:55 2014 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Wed Oct 29 09:55:32 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/EdgeManagerPlugin.java   |  22 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   7 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  12 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  96 +++--
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  24 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  67 +--
 .../tez/dag/app/dag/impl/VertexManager.java     |  10 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 425 ++++++++++++++++++-
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |   5 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  55 ++-
 .../vertexmanager/TestShuffleVertexManager.java |   7 +-
 .../tez/test/TestExceptionPropagation.java      | 139 +++++-
 13 files changed, 735 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39e3f16..329b475 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -64,6 +64,7 @@ ALL CHANGES:
   TEZ-1713. tez.lib.uris should not require the paths specified to be fully qualified.
   TEZ-1715. Fix use of import java.util.* in MultiMRInput.
   TEZ-1664. Add checks to ensure that the client and AM are compatible.
+  TEZ-1689. Exception handling for EdgeManagerPlugin.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
index 7e8d3ae..8768e7d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
@@ -58,25 +58,27 @@ public abstract class EdgeManagerPlugin {
    * EdgeManagerPlugin instance is created and setup by the user. The initialize
    * method will be called with the original {@link EdgeManagerPluginContext} when the
    * EdgeManagerPlugin is replaced.
-   *
+   * @throws Exception
    */
-  public abstract void initialize();
+  public abstract void initialize() throws Exception;
   
   /**
    * Get the number of physical inputs on the destination task
    * @param destinationTaskIndex Index of destination task for which number of 
    * inputs is needed
    * @return Number of physical inputs on the destination task
+   * @throws Exception
    */
-  public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex);
+  public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception;
 
   /**
    * Get the number of physical outputs on the source task
    * @param sourceTaskIndex Index of the source task for which number of outputs 
    * is needed
    * @return Number of physical outputs on the source task
+   * @throws Exception
    */
-  public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex);
+  public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception;
   
   /**
    * Return the routing information to inform consumers about the source task
@@ -94,10 +96,11 @@ public abstract class EdgeManagerPlugin {
    *          event
    * @param destinationTaskAndInputIndices
    *          Map via which the routing information is returned
+   * @throws Exception
    */
   public abstract void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex,
-      Map<Integer, List<Integer>> destinationTaskAndInputIndices);
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception;
   
   /**
    * Return the routing information to inform consumers about the failure of a
@@ -112,15 +115,17 @@ public abstract class EdgeManagerPlugin {
    *          Source task
    * @param destinationTaskAndInputIndices
    *          Map via which the routing information is returned
+   * @throws Exception
    */
   public abstract void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      Map<Integer, List<Integer>> destinationTaskAndInputIndices);
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception;
 
   /**
    * Get the number of destination tasks that consume data from the source task
    * @param sourceTaskIndex Source task index
+   * @throws Exception
    */
-  public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+  public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception;
   
   /**
    * Return the source task index to which to send the input error event
@@ -133,9 +138,10 @@ public abstract class EdgeManagerPlugin {
    *          Index of the physical input on the destination task that reported 
    *          the error
    * @return Index of the source task that created the unavailable input
+   * @throws Exception
    */
   public abstract int routeInputErrorEventToSource(InputReadErrorEvent event,
-      int destinationTaskIndex, int destinationFailedInputIndex);
+      int destinationTaskIndex, int destinationFailedInputIndex) throws Exception;
 
   /**
    * Return ahe {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 1a61707..cefd34d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezTaskID;
@@ -88,7 +89,7 @@ public interface Vertex extends Comparable<Vertex> {
 
   boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdate);
+      Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException;
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
 
   // CHANGE THESE TO LISTS AND MAINTAIN ORDER?
@@ -110,8 +111,8 @@ public interface Vertex extends Comparable<Vertex> {
   public Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> 
     getAdditionalOutputs();
 
-  List<InputSpec> getInputSpecList(int taskIndex);
-  List<OutputSpec> getOutputSpecList(int taskIndex);
+  List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException;
+  List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException;
   
   List<GroupInputSpec> getGroupInputSpecList(int taskIndex);
   void addSharedOutputs(Set<String> outputs);

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 50b538d..d3aecd4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -1238,7 +1239,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     // Initialize the edges, now that the payload and vertices have been set.
     for (Edge e : edges.values()) {
-      e.initialize();
+      try {
+        e.initialize();
+      } catch (AMUserCodeException ex) {
+        String msg = "Exception in " + ex.getSource();
+        LOG.error(msg, ex);
+        addDiagnostic(msg + ", " + ex.getMessage() + ", "
+            + ExceptionUtils.getStackTrace(ex.getCause()));
+        finished(DAGState.FAILED);
+        return DAGState.FAILED;
+      }
     }
 
     assignDAGScheduler(this);

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 4cd0730..57d742f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -37,6 +37,7 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
@@ -148,9 +149,14 @@ public class Edge {
     }
   }
 
-  public void initialize() {
+  public void initialize() throws AMUserCodeException {
     if (edgeManager != null) {
-      edgeManager.initialize();
+      try {
+        edgeManager.initialize();
+      } catch (Exception e) {
+        throw new AMUserCodeException(Source.EdgeManager, "Fail to initialize Edge,"
+            + getEdgeInfo(), e);
+      }
     }
     destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, 
         destinationVertex.getName(), 
@@ -158,7 +164,8 @@ public class Edge {
         null);
   }
 
-  public synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) {
+  public synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
+      throws AMUserCodeException {
     EdgeProperty modifiedEdgeProperty =
         EdgeProperty.create(descriptor,
             edgeProperty.getDataSourceType(),
@@ -200,27 +207,39 @@ public class Edge {
     this.destinationVertex = destinationVertex;
   }
 
-  public InputSpec getDestinationSpec(int destinationTaskIndex) {
+  public InputSpec getDestinationSpec(int destinationTaskIndex) throws AMUserCodeException {
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
-    return new InputSpec(sourceVertex.getName(),
-        edgeProperty.getEdgeDestination(),
-        edgeManager.getNumDestinationTaskPhysicalInputs(destinationTaskIndex));
+    try {
+      return new InputSpec(sourceVertex.getName(),
+          edgeProperty.getEdgeDestination(),
+          edgeManager.getNumDestinationTaskPhysicalInputs(destinationTaskIndex));
+    } catch (Exception e) {
+      throw new AMUserCodeException(Source.EdgeManager,
+          "Fail to getDestinationSpec, destinationTaskIndex="
+          + destinationTaskIndex +", " + getEdgeInfo(), e);
+    }
   }
 
-  public OutputSpec getSourceSpec(int sourceTaskIndex) {
+  public OutputSpec getSourceSpec(int sourceTaskIndex) throws AMUserCodeException {
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
-    return new OutputSpec(destinationVertex.getName(),
-        edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
-        sourceTaskIndex));
+    try {
+      return new OutputSpec(destinationVertex.getName(),
+          edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
+          sourceTaskIndex));
+    } catch (Exception e) {
+      throw new AMUserCodeException(Source.EdgeManager,
+          "Fail to getSourceSpec, sourceTaskIndex="
+          + sourceTaskIndex + ", " + getEdgeInfo(), e);
+    }
   }
   
   public void startEventBuffering() {
     bufferEvents.set(true);
   }
   
-  public void stopEventBuffering() {
+  public void stopEventBuffering() throws AMUserCodeException {
     // assume only 1 entity will start and stop event buffering
     bufferEvents.set(false);
     for(TezEvent event : destinationEventBuffer) {
@@ -233,7 +252,7 @@ public class Edge {
     sourceEventBuffer.clear();
   }
   
-  public void sendTezEventToSourceTasks(TezEvent tezEvent) {
+  public void sendTezEventToSourceTasks(TezEvent tezEvent) throws AMUserCodeException {
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
     if (!bufferEvents.get()) {
@@ -243,10 +262,21 @@ public class Edge {
         TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
             .getTaskAttemptID();
         int destTaskIndex = destAttemptId.getTaskID().getId();
-        int srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
-            destTaskIndex, event.getIndex());
-        int numConsumers = edgeManager.getNumDestinationConsumerTasks(
-            srcTaskIndex);
+        int srcTaskIndex;
+        int numConsumers;
+        try {
+          srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
+              destTaskIndex, event.getIndex());
+          numConsumers = edgeManager.getNumDestinationConsumerTasks(
+              srcTaskIndex);
+        } catch (Exception e) {
+          throw new AMUserCodeException(Source.EdgeManager,
+              "Fail to sendTezEventToSourceTasks, "
+              + "TezEvent:" + tezEvent.getEvent()
+              + "sourceInfo:" + tezEvent.getSourceInfo()
+              + "destinationInfo:" + tezEvent.getDestinationInfo()
+              + ", " + getEdgeInfo(), e);
+        }
         Task srcTask = sourceVertex.getTask(srcTaskIndex);
         if (srcTask == null) {
           throw new TezUncheckedException("Unexpected null task." +
@@ -273,7 +303,7 @@ public class Edge {
   }
   
 
-  private void handleCompositeDataMovementEvent(TezEvent tezEvent) {
+  private void handleCompositeDataMovementEvent(TezEvent tezEvent) throws AMUserCodeException {
     CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent();
     EventMetaData srcInfo = tezEvent.getSourceInfo();
     
@@ -340,7 +370,7 @@ public class Edge {
     }
   }
   
-  public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
+  public void sendTezEventToDestinationTasks(TezEvent tezEvent) throws AMUserCodeException {
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
     if (!bufferEvents.get()) {
@@ -371,14 +401,21 @@ public class Edge {
         }
 
         if (routingRequired) {
-          if (isDataMovementEvent) {
-            DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
-            edgeManager.routeDataMovementEventToDestination(dmEvent,
-                srcTaskIndex, dmEvent.getSourceIndex(),
-                destTaskAndInputIndices);
-          } else {
-            edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex,
-                destTaskAndInputIndices);
+          try {
+            if (isDataMovementEvent) {
+              DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+              edgeManager.routeDataMovementEventToDestination(dmEvent,
+                  srcTaskIndex, dmEvent.getSourceIndex(),
+                  destTaskAndInputIndices);
+            } else {
+              edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex,
+                  destTaskAndInputIndices);
+            }
+          } catch (Exception e){
+            throw new AMUserCodeException(Source.EdgeManager,
+                "Fail to sendTezEventToDestinationTasks, event:" + tezEvent.getEvent()
+                + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
+                + tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
           }
         }
 
@@ -425,4 +462,9 @@ public class Edge {
         .append(" edgeManager=").append(edgeManager.getClass().getName())
         .append(" Event type=").append(tezEvent.getEventType()).toString();
   }
+
+  private String getEdgeInfo() {
+    return "EdgeInfo: sourceVertexName=" + getSourceVertexName() + ", destinationVertexName="
+        + getDestinationVertexName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index eab07a5..43be5aa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -71,6 +72,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -180,7 +182,7 @@ public class TaskAttemptImpl implements TaskAttempt,
             (TaskAttemptStateInternal.NEW)
 
       .addTransition(TaskAttemptStateInternal.NEW,
-          TaskAttemptStateInternal.START_WAIT,
+          EnumSet.of(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED),
           TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
       .addTransition(TaskAttemptStateInternal.NEW,
           TaskAttemptStateInternal.NEW,
@@ -501,7 +503,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     return getVertexID().getDAGId();
   }
 
-  TaskSpec createRemoteTaskSpec() {
+  TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
     Vertex vertex = getVertex();
     ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
     int taskId = getTaskID().getId();
@@ -1039,17 +1041,28 @@ public class TaskAttemptImpl implements TaskAttempt,
   //////////////////////////////////////////////////////////////////////////////
 
   protected static class ScheduleTaskattemptTransition implements
-      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
     @Override
-    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+    public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
 
       // TODO Creating the remote task here may not be required in case of
       // recovery.
 
       // Create the remote task.
-      TaskSpec remoteTaskSpec = ta.createRemoteTaskSpec();
+      TaskSpec remoteTaskSpec;
+      try {
+        remoteTaskSpec = ta.createRemoteTaskSpec();
+        LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+      } catch (AMUserCodeException e) {
+        String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta.getTaskID();
+        LOG.error(msg, e);
+        String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
+        new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
+            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag));
+        return TaskAttemptStateInternal.FAILED;
+      }
       // Create startTaskRequest
 
       String[] requestHosts = new String[0];
@@ -1096,6 +1109,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
           priority, ta.containerContext);
       ta.sendEvent(launchRequestEvent);
+      return TaskAttemptStateInternal.START_WAIT;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 38f242e..50e8b0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -166,6 +166,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 
@@ -1170,8 +1171,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         } catch (AMUserCodeException e) {
           String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
           LOG.error(msg, e);
-          addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-          // TODO add test for exception from EdgeManager in TEZ-1689
+          addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
           eventHandler.handle(new VertexEventTermination(vertexId, VertexTerminationCause.AM_USERCODE_FAILURE));
           return;
         }
@@ -1201,7 +1201,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates) {
+      Map<String, InputSpecUpdate> rootInputSpecUpdates) throws AMUserCodeException {
     return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
         false);
   }
@@ -1209,7 +1209,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdates,
-      boolean recovering) {
+      boolean recovering) throws AMUserCodeException {
     if (recovering) {
       writeLock.lock();
       try {
@@ -1223,11 +1223,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             try {
               edge.setCustomEdgeManager(entry.getValue());
             } catch (Exception e) {
-              LOG.warn("Failed to initialize edge manager for edge"
-                  + ", sourceVertexName=" + sourceVertex.getName()
-                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
-                  e);
-              return false;
+              throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,"
+                  + "sourceVertex:" + edge.getSourceVertexName()
+                  + "destinationVertex:" + edge.getDestinationVertexName(), e);
             }
           }
         }
@@ -1272,11 +1270,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             try {
               edge.setCustomEdgeManager(entry.getValue());
             } catch (Exception e) {
-              LOG.warn("Failed to initialize edge manager for edge"
-                  + ", sourceVertexName=" + sourceVertex.getName()
-                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
-                  e);
-              return false;
+              throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,"
+                  + "sourceVertex:" + edge.getSourceVertexName()
+                  + "destinationVertex:" + edge.getDestinationVertexName(), e);
             }
           }
         }
@@ -1376,11 +1372,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             try {
               edge.setCustomEdgeManager(entry.getValue());
             } catch (Exception e) {
-              LOG.warn("Failed to initialize edge manager for edge"
-                  + ", sourceVertexName=" + sourceVertex.getName()
-                  + ", destinationVertexName=" + edge.getDestinationVertexName(),
-                  e);
-              return false;
+              throw new TezUncheckedException(e);
             }
           }
         }
@@ -1960,7 +1952,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
       LOG.error(msg, e);
       finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
-          msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+          msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
       return VertexState.FAILED;
     }
 
@@ -2542,8 +2534,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             endState = VertexState.FAILED;
             break;
           }
-          if (!vertex.setParallelism(0,
-              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true)) {
+          boolean successSetParallelism ;
+          try {
+            successSetParallelism = vertex.setParallelism(0,
+              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true);
+          } catch (Exception e) {
+            successSetParallelism = false;
+          }
+          if (!successSetParallelism) {
             String msg  = "Failed to recover edge managers, vertex=" + vertex.logIdentifier;
             LOG.error(msg);
             vertex.finished(VertexState.FAILED,
@@ -2594,8 +2592,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             endState = VertexState.FAILED;
             break;
           }
-          if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
-            vertex.recoveredRootInputSpecUpdates, true)) {
+          try {
+            successSetParallelism = vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
+              vertex.recoveredRootInputSpecUpdates, true);
+          } catch (Exception e) {
+            successSetParallelism = false;
+          }
+          if (!successSetParallelism) {
             String msg = "Failed to recover edge managers for vertex:" + vertex.logIdentifier;
             LOG.error(msg);
             vertex.finished(VertexState.FAILED,
@@ -2700,7 +2703,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
             LOG.error(msg, e);
             vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
-                msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+                msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
             endState = VertexState.FAILED;
           }
         }
@@ -2995,7 +2998,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           " sent by vertex " + splitEvent.getSenderVertex() +
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
-      vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
+      try {
+        vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
+      } catch (Exception e) {
+        // ingore this exception, should not happen
+        LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,"
+            + "exception should not happen here", e);
+      }
       if (vertex.getState() == VertexState.RUNNING ||
           vertex.getState() == VertexState.INITED) {
         return vertex.getState();
@@ -3501,7 +3510,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
         LOG.error(msg, e);
         if (vertex.getState() == VertexState.RUNNING) {
-          vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+          vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
           vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
           return VertexState.TERMINATING;
         } else {
@@ -3923,7 +3932,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<InputSpec> getInputSpecList(int taskIndex) {
+  public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
     inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
         + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
     if (rootInputDescriptors != null) {
@@ -3948,7 +3957,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) {
+  public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
     if (this.outputSpecList == null) {
       outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
           + this.additionalOutputSpecs.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 892aea2..c5fe41f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -40,6 +40,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -110,8 +111,13 @@ public class VertexManager {
     public boolean setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
         Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
         Map<String, InputSpecUpdate> rootInputSpecUpdate) {
-      return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
-          rootInputSpecUpdate);
+      try {
+        return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
+            rootInputSpecUpdate);
+      } catch (AMUserCodeException e) {
+        // workaround: convert it to TezUncheckedException which would be caught in VM
+        throw new TezUncheckedException(e);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 9430526..a597d49 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -24,9 +24,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +43,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -50,6 +57,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -71,14 +80,13 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate.UpdateType;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
@@ -87,23 +95,35 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.TestDAGImpl.CustomizedEdgeManager.ExceptionLocation;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 public class TestDAGImpl {
@@ -133,22 +153,33 @@ public class TestDAGImpl {
   private DAGPlan groupDagPlan;
   private DAGImpl groupDag;
   private TezDAGID groupDagId;
+  private DAGPlan dagPlanWithCustomEdge;
+  private DAGImpl dagWithCustomEdge;
+  private TezDAGID dagWithCustomEdgeId;
+  private AppContext dagWithCustomEdgeAppContext;
   private HistoryEventHandler historyEventHandler;
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
 
+  private DAGImpl chooseDAG(TezDAGID curDAGId) {
+    if (curDAGId.equals(dagId)) {
+      return dag;
+    } else if (curDAGId.equals(mrrDagId)) {
+      return mrrDag;
+    } else if (curDAGId.equals(groupDagId)) {
+      return groupDag;
+    } else if (curDAGId.equals(dagWithCustomEdgeId)) {
+      return dagWithCustomEdge;
+    } else {
+      throw new RuntimeException("Invalid event, unknown dag"
+          + ", dagId=" + curDAGId);
+    }
+  }
+  
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     @Override
     public void handle(DAGEvent event) {
-      if (event.getDAGId().equals(dagId)) {
-        dag.handle(event);
-      } else if (event.getDAGId().equals(mrrDagId)) {
-        mrrDag.handle(event);
-      } else if (event.getDAGId().equals(groupDagId)) {
-        groupDag.handle(event);
-      } else {
-        throw new RuntimeException("Invalid event, unknown dag"
-            + ", dagId=" + event.getDAGId());
-      }
+      DAGImpl dag = chooseDAG(event.getDAGId());
+      dag.handle(event);
     }
   }
 
@@ -163,7 +194,7 @@ public class TestDAGImpl {
     @Override
     public void handle(TaskEvent event) {
       TezDAGID id = event.getTaskID().getVertexID().getDAGId();
-      DAGImpl handler = id.equals(dagId) ? dag : (id.equals(mrrDagId) ? mrrDag : groupDag);
+      DAGImpl handler = chooseDAG(id);
       Vertex vertex = handler.getVertex(event.getTaskID().getVertexID());
       Task task = vertex.getTask(event.getTaskID());
       ((EventHandler<TaskEvent>)task).handle(event);
@@ -177,6 +208,18 @@ public class TestDAGImpl {
     }
   }
 
+  private class TaskAttemptEventDisptacher2 implements EventHandler<TaskAttemptEvent> {
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      TezDAGID id = event.getTaskAttemptID().getTaskID().getVertexID().getDAGId();
+      DAGImpl handler = chooseDAG(id);
+      Vertex vertex = handler.getVertex(event.getTaskAttemptID().getTaskID().getVertexID());
+      Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
+      TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
+      ((EventHandler<TaskAttemptEvent>)ta).handle(event);
+    }
+  }
+  
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -184,7 +227,7 @@ public class TestDAGImpl {
     @Override
     public void handle(VertexEvent event) {
       TezDAGID id = event.getVertexId().getDAGId();
-      DAGImpl handler = id.equals(dagId) ? dag : (id.equals(mrrDagId) ? mrrDag : groupDag);
+      DAGImpl handler = chooseDAG(id);
       Vertex vertex = handler.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
@@ -601,6 +644,75 @@ public class TestDAGImpl {
     return dag;
   }
 
+  private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation) {
+    LOG.info("Setting up dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex1")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(1)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x1.y1")
+                .build()
+                )
+            .addOutEdgeId("e1")
+            .build()
+            )
+          .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex2")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host2")
+                .addRack("rack2")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("foo")
+                .setTaskModule("x2.y2")
+                .build()
+                )
+            .addInEdgeId("e1")
+            .build()
+            )
+         .addEdge(
+            EdgePlan.newBuilder()
+            .setEdgeManager(TezEntityDescriptorProto.newBuilder()
+                .setClassName(CustomizedEdgeManager.class.getName())
+                .setUserPayload(ByteString.copyFromUtf8(exLocation.name()))
+                )
+            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+            .setInputVertexName("vertex1")
+            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+            .setOutputVertexName("vertex2")
+            .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+            .setId("e1")
+            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+            .build()
+            )
+          .build();
+    return dag;
+  }
+
   @Before
   public void setup() {
     conf = new Configuration();
@@ -675,6 +787,30 @@ public class TestDAGImpl {
     dag = null;
   }
 
+  private class AMSchedulerEventHandler implements EventHandler<AMSchedulerEvent> {
+    @Override
+    public void handle(AMSchedulerEvent event) {
+      // do nothing
+    }
+  }
+
+  private void setupDAGWithCustomEdge(ExceptionLocation exLocation) {
+    dagWithCustomEdgeId =  TezDAGID.getInstance(appAttemptId.getApplicationId(), 4);
+    dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation);
+    dagWithCustomEdgeAppContext = mock(AppContext.class);
+    doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
+    dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
+        dispatcher.getEventHandler(),  taskAttemptListener,
+        fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
+    doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
+    doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG();
+    doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
+    doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
+    doReturn(historyEventHandler).when(dagWithCustomEdgeAppContext).getHistoryHandler();
+    dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
+    dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
+  }
+
   private void initDAG(DAGImpl impl) {
     impl.handle(
         new DAGEvent(impl.getID(), DAGEventType.DAG_INIT));
@@ -696,6 +832,14 @@ public class TestDAGImpl {
   }
 
   @Test(timeout = 5000)
+  public void testDAGInitFailed() {
+    setupDAGWithCustomEdge(ExceptionLocation.Initialize);
+    dagWithCustomEdge.handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
+  }
+
+  @Test(timeout = 5000)
   public void testDAGStart() {
     initDAG(dag);
     startDAG(dag);
@@ -743,6 +887,170 @@ public class TestDAGImpl {
   }
   
   @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
+    setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    LOG.info(v2.getTasks().size());
+    Task t1= v2.getTask(0);
+    dispatcher.getEventHandler().handle(new TaskEvent(t1.getTaskId(), TaskEventType.T_SCHEDULE));
+    dispatcher.await();
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+    Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
+    String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
+    setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    Task t1= v1.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+    Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
+    String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_RouteDataMovementEventToDestination() {
+    setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    dispatcher.await();
+    Task t1= v2.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+    DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
+    TezEvent tezEvent = new TezEvent(daEvent, 
+        new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
+    setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    dispatcher.await();
+
+    Task t1= v2.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+    InputFailedEvent ifEvent = InputFailedEvent.create(0, 1);
+    TezEvent tezEvent = new TezEvent(ifEvent, 
+        new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getID()));
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+    // 
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_GetNumDestinationConsumerTasks() {
+    setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    dispatcher.await();
+
+    Task t1= v2.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+    InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
+    TezEvent tezEvent = new TezEvent(ireEvent, 
+        new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getID()));
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+    // 
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationConsumerTasks.name()));
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test()
+  public void testEdgeManager_RouteInputErrorEventToSource() {
+    setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    dispatcher.await();
+
+    Task t1= v2.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+    InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
+    TezEvent tezEvent = new TezEvent(ireEvent, 
+        new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getID()));
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+    // 
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputErrorEventToSource.name()));
+  }
+  
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testGroupDAGCompletionWithCommitSuccess() {
     // should have only 2 commits. 1 vertex3 commit and 1 group commit.
@@ -1212,4 +1520,93 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  public static class CustomizedEdgeManager extends EdgeManagerPlugin {
+
+    public static enum ExceptionLocation {
+      Initialize,
+      GetNumDestinationTaskPhysicalInputs,
+      GetNumSourceTaskPhysicalOutputs,
+      RouteDataMovementEventToDestination,
+      RouteInputSourceTaskFailedEventToDestination,
+      GetNumDestinationConsumerTasks,
+      RouteInputErrorEventToSource
+    }
+
+    private ExceptionLocation exLocation;
+
+    public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
+      return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName())
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
+    }
+
+    public CustomizedEdgeManager(EdgeManagerPluginContext context) {
+      super(context);
+      this.exLocation = ExceptionLocation.valueOf(
+          new String(context.getUserPayload().deepCopyAsArray()));
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      if (exLocation == ExceptionLocation.Initialize) {
+        throw new Exception(exLocation.name());
+      }
+    }
+
+    @Override
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public void routeDataMovementEventToDestination(DataMovementEvent event,
+        int sourceTaskIndex, int sourceOutputIndex,
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices)
+        throws Exception {
+      if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
+        throw new Exception(exLocation.name());
+      }
+    }
+
+    @Override
+    public void routeInputSourceTaskFailedEventToDestination(
+        int sourceTaskIndex,
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices)
+        throws Exception {
+      if (exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
+        throw new Exception(exLocation.name());
+      }
+    }
+
+    @Override
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public int routeInputErrorEventToSource(InputReadErrorEvent event,
+        int destinationTaskIndex, int destinationFailedInputIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index d3f24e5..31c8064 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -30,12 +30,9 @@ import static org.mockito.Mockito.verify;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -62,7 +59,7 @@ public class TestEdge {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test (timeout = 5000)
-  public void testCompositeEventHandling() {
+  public void testCompositeEventHandling() throws AMUserCodeException {
     EventHandler eventHandler = mock(EventHandler.class);
     EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
         DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class),

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 2489b1b..72dcd92 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -30,7 +30,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -2112,7 +2111,7 @@ public class TestVertexImpl {
         anyInt());
   }
 
-  public void setupPostDagCreation() {
+  public void setupPostDagCreation() throws AMUserCodeException {
     String dagName = "dag0";
     dispatcher = new DrainDispatcher();
     appContext = mock(AppContext.class);
@@ -2197,7 +2196,7 @@ public class TestVertexImpl {
   }
   
   @Before
-  public void setup() {
+  public void setup() throws AMUserCodeException {
     useCustomInitializer = false;
     customInitializer = null;
     setupPreDagCreation();
@@ -2270,7 +2269,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexInit() {
+  public void testVertexInit() throws AMUserCodeException {
     initAllVertices(VertexState.INITED);
 
     VertexImpl v3 = vertices.get("vertex3");
@@ -2327,7 +2326,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexSetParallelism() {
+  public void testVertexSetParallelism() throws Exception {
     initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
     Assert.assertEquals(2, v3.getTotalTasks());
@@ -2392,7 +2391,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testSetCustomEdgeManager() throws UnsupportedEncodingException {
+  public void testSetCustomEdgeManager() throws Exception {
     initAllVertices(VertexState.INITED);
     Edge edge = edges.get("e4");
     EdgeManagerPlugin em = edge.getEdgeManager();
@@ -3026,7 +3025,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexInitWithCustomVertexManager() {
+  public void testVertexInitWithCustomVertexManager() throws Exception {
     setupPreDagCreation();
     dagPlan = createDAGWithCustomVertexManager();
     setupPostDagCreation();
@@ -3074,7 +3073,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexManagerHeuristic() {
+  public void testVertexManagerHeuristic() throws AMUserCodeException {
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMixedEdges();
     setupPostDagCreation();
@@ -3095,7 +3094,7 @@ public class TestVertexImpl {
 
 
   @Test(timeout = 5000)
-  public void testVertexWithOneToOneSplit() {
+  public void testVertexWithOneToOneSplit() throws AMUserCodeException {
     // create a diamond shaped dag with 1-1 edges. 
     // split the source and remaining vertices should split equally
     // vertex with 2 incoming splits from the same source should split once
@@ -3261,7 +3260,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexWithInitializerFailure() {
+  public void testVertexWithInitializerFailure() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -3302,7 +3301,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 10000)
-  public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException {
+  public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, AMUserCodeException {
     useCustomInitializer = true;
     customInitializer = new RootInitializerSettingParallelismTo0(null);
     RootInitializerSettingParallelismTo0 initializer =
@@ -3838,7 +3837,7 @@ public class TestVertexImpl {
    * If broadcast, one-to-one or custom edges are present in source, tasks should not start until
    * 1 task from each source vertex is complete.
    */
-  public void testTaskSchedulingWithCustomEdges() {
+  public void testTaskSchedulingWithCustomEdges() throws AMUserCodeException {
     setupPreDagCreation();
     dagPlan = createCustomDAGWithCustomEdges();
     setupPostDagCreation();
@@ -4178,7 +4177,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexWithMultipleInitializers1() {
+  public void testVertexWithMultipleInitializers1() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4205,7 +4204,7 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexWithMultipleInitializers2() {
+  public void testVertexWithMultipleInitializers2() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4233,7 +4232,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 500000)
-  public void testVertexWithInitializerSuccess() {
+  public void testVertexWithInitializerSuccess() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4308,7 +4307,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexWithInputDistributor() {
+  public void testVertexWithInputDistributor() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
@@ -4343,7 +4342,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexRootInputSpecUpdateAll() {
+  public void testVertexRootInputSpecUpdateAll() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4374,7 +4373,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testVertexRootInputSpecUpdatePerTask() {
+  public void testVertexRootInputSpecUpdatePerTask() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4651,7 +4650,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout=5000)
-  public void testVertexGroupInput() {
+  public void testVertexGroupInput() throws AMUserCodeException {
     setupPreDagCreation();
     dagPlan = createVertexGroupDAGPlan();
     setupPostDagCreation();
@@ -4679,7 +4678,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testStartWithUninitializedCustomEdge() {
+  public void testStartWithUninitializedCustomEdge() throws Exception {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
@@ -4726,7 +4725,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testInitStartRace() {
+  public void testInitStartRace() throws AMUserCodeException {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
@@ -4750,7 +4749,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testInitStartRace2() {
+  public void testInitStartRace2() throws AMUserCodeException {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
@@ -4778,7 +4777,7 @@ public class TestVertexImpl {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_Initialize() {
+  public void testExceptionFromVM_Initialize() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.Initialize);
@@ -4798,7 +4797,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnRootVertexInitialized() {
+  public void testExceptionFromVM_OnRootVertexInitialized() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnRootVertexInitialized);
@@ -4823,7 +4822,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnVertexStarted() {
+  public void testExceptionFromVM_OnVertexStarted() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexStarted);
@@ -4851,7 +4850,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnSourceTaskCompleted() {
+  public void testExceptionFromVM_OnSourceTaskCompleted() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnSourceTaskCompleted);
@@ -4888,7 +4887,7 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testExceptionFromVM_OnVertexManagerEventReceived() {
+  public void testExceptionFromVM_OnVertexManagerEventReceived() throws AMUserCodeException {
     useCustomInitializer = true;
     setupPreDagCreation();
     dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexManagerEventReceived);
@@ -4914,7 +4913,7 @@ public class TestVertexImpl {
   }
   
   @Test(timeout = 5000)
-  public void testExceptionFromII_Initialize() {
+  public void testExceptionFromII_Initialize() throws AMUserCodeException {
     useCustomInitializer = true;
     customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize);
     EventHandlingRootInputInitializer initializer =

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index d967122..6e22194 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.library.vertexmanager;
 
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
@@ -66,7 +67,7 @@ public class TestShuffleVertexManager {
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test(timeout = 5000)
-  public void testShuffleVertexManagerAutoParallelism() throws IOException {
+  public void testShuffleVertexManagerAutoParallelism() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean(
         ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
@@ -166,7 +167,7 @@ public class TestShuffleVertexManager {
         new HashMap<String, EdgeManagerPlugin>();
 
     doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
+      public Object answer(InvocationOnMock invocation) throws Exception {
           when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
           newEdgeManagers.clear();
           for (Entry<String, EdgeManagerPluginDescriptor> entry :
@@ -698,7 +699,7 @@ public class TestShuffleVertexManager {
     final Map<String, EdgeManagerPlugin> edgeManagerR2 =
         new HashMap<String, EdgeManagerPlugin>();
     doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
+      public Object answer(InvocationOnMock invocation) throws Exception {
         when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(2);
         edgeManagerR2.clear();
         for (Entry<String, EdgeManagerPluginDescriptor> entry :

http://git-wip-us.apache.org/repos/asf/tez/blob/0e2672de/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 44b557b..eef6ab3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -43,11 +43,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -63,6 +63,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
+import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
@@ -77,13 +78,18 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.test.TestAMRecovery.DoNothingProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestExceptionPropagation {
 
   private static final Log LOG = LogFactory
@@ -235,7 +241,7 @@ public class TestExceptionPropagation {
       startMiniTezCluster();
       startNonSessionClient();
 
-      ExceptionLocation exLocation = ExceptionLocation.INPUT_START;
+      ExceptionLocation exLocation = ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs;
       LOG.info("NonSession mode, Test for Exception from:" + exLocation.name());
       DAG dag = createDAG(exLocation);
       DAGClient dagClient = tezClient.submitDAG(dag);
@@ -296,6 +302,13 @@ public class TestExceptionPropagation {
     // VM
     VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
     VM_ON_VERTEXMANAGEREVENT_RECEIVED,
+
+    // EdgeManager
+    EM_Initialize, EM_GetNumDestinationTaskPhysicalInputs, EM_GetNumSourceTaskPhysicalOutputs,
+    EM_RouteDataMovementEventToDestination, EM_GetNumDestinationConsumerTasks,
+    EM_RouteInputErrorEventToSource,
+    // Not Supported yet
+    // EM_RouteInputSourceTaskFailedEventToDestination,
   }
 
   /**
@@ -317,17 +330,20 @@ public class TestExceptionPropagation {
         InputInitializerWithException.getIIDesc(payload);
     v1.addDataSource("input",
         DataSourceDescriptor.create(inputDesc, iiDesc, null));
-    OutputDescriptor outputDesc = OutputWithException.getOutputDesc(payload);
-    v1.addDataSink("output", DataSinkDescriptor.create(outputDesc, null, null));
     v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
 
     Vertex v2 = 
-        Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1);
+        Vertex.create("v2", 
+            ProcessorDescriptor.create(SleepProcessor.class.getName())
+              .setUserPayload(new SleepProcessorConfig(3).toUserPayload())
+            , 1);
     v2.setVertexManagerPlugin(ShuffleVertexManagerWithException.getVMDesc(exLocation));
 
     dag.addVertex(v1)
       .addVertex(v2)
-      .addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+      .addEdge(Edge.create(v1, v2, EdgeProperty.create(
+        EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
+          .setUserPayload(payload),
         DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
         OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
     return dag;
@@ -364,6 +380,7 @@ public class TestExceptionPropagation {
     }
   }
 
+  // input of vertex2
   public static class InputWithException extends AbstractLogicalInput {
 
     private ExceptionLocation exLocation;
@@ -419,6 +436,10 @@ public class TestExceptionPropagation {
       getContext().requestInitialMemory(0l, null); // mandatory call
       if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
         throw new Exception(this.exLocation.name());
+      } else if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
+          || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
+        Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
+        return Lists.newArrayList(errorEvent);
       }
       return null;
     }
@@ -429,6 +450,7 @@ public class TestExceptionPropagation {
     }
   }
 
+  // output of vertex1
   public static class OutputWithException extends AbstractLogicalOutput {
 
     private ExceptionLocation exLocation;
@@ -470,7 +492,13 @@ public class TestExceptionPropagation {
         List<Event> events = new ArrayList<Event>();
         events.add(VertexManagerEvent.create("v2", ByteBuffer.wrap(new byte[0])));
         return events;
-      } else {
+      } else if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+        // send DataMovementEvent to v2
+        List<Event> events = new ArrayList<Event>();
+        events.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])));
+        return events;
+      }
+      else {
         return null;
       }
     }
@@ -508,10 +536,11 @@ public class TestExceptionPropagation {
       input.start();
       input.getReader();
 
-      OutputWithException output = (OutputWithException) outputs.get("output");
+      OutputWithException output = (OutputWithException) outputs.get("v2");
       output.start();
       output.getWriter();
 
+      Thread.sleep(3*1000);
       if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_ERROR) {
         throw new Error(this.exLocation.name());
       } else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
@@ -600,7 +629,14 @@ public class TestExceptionPropagation {
 
     @Override
     public void initialize() {
-      super.initialize();
+      try {
+        super.initialize();
+      } catch (TezUncheckedException e) {
+        // workaround for testing
+        if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
+          throw e;
+        }
+      }
       Configuration conf;
       try {
         conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
@@ -634,4 +670,85 @@ public class TestExceptionPropagation {
               .setUserPayload(payload);
     }
   }
+
+  public static class CustomEdgeManager extends ScatterGatherEdgeManager {
+
+    private ExceptionLocation exLocation;
+
+    public CustomEdgeManager(EdgeManagerPluginContext context) {
+      super(context);
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(context.getUserPayload()
+              .deepCopyAsArray()));
+    }
+
+    @Override
+    public void initialize() {
+      if (exLocation == ExceptionLocation.EM_Initialize) {
+        throw new RuntimeException(exLocation.name());
+      }
+      try {
+        super.initialize();
+      } catch (TezUncheckedException e) {
+        // workaround for testing
+        if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
+          throw e;
+        }
+      }
+    }
+
+    @Override
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+      if (exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.getNumDestinationConsumerTasks(sourceTaskIndex);
+    }
+
+    @Override
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
+      if (exLocation == ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs) {
+        throw new RuntimeException(exLocation.name());
+      }
+      LOG.info("ExLocation:" + exLocation);
+      return super.getNumSourceTaskPhysicalOutputs(sourceTaskIndex);
+    }
+
+    @Override
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+      if (exLocation == ExceptionLocation.EM_GetNumDestinationTaskPhysicalInputs) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.getNumDestinationTaskPhysicalInputs(destinationTaskIndex);
+    }
+
+    @Override
+    public void routeDataMovementEventToDestination(DataMovementEvent event,
+        int sourceTaskIndex, int sourceOutputIndex,
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+      if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+        throw new RuntimeException(exLocation.name());
+      }
+      super.routeDataMovementEventToDestination(event, sourceTaskIndex,
+          sourceOutputIndex, destinationTaskAndInputIndices);
+    }
+
+    @Override
+    public int routeInputErrorEventToSource(InputReadErrorEvent event,
+        int destinationTaskIndex, int destinationFailedInputIndex) {
+      if (exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.routeInputErrorEventToSource(event, destinationTaskIndex,
+          destinationFailedInputIndex);
+    }
+
+    @Override
+    public void routeInputSourceTaskFailedEventToDestination(
+        int sourceTaskIndex,
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+      super.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex,
+          destinationTaskAndInputIndices);
+    }
+  }
 }


Mime
View raw message