tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-748. Test Tez Fault Tolerance (bikas)
Date Tue, 28 Jan 2014 21:00:18 GMT
Updated Branches:
  refs/heads/master a5d481178 -> 25c32021e


TEZ-748. Test Tez Fault Tolerance (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/25c32021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/25c32021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/25c32021

Branch: refs/heads/master
Commit: 25c32021e0646bc48a79676b5a5000606ac0aeed
Parents: a5d4811
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Jan 28 13:00:04 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Jan 28 13:00:04 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezSession.java  |   5 +
 .../runtime/api/events/DataMovementEvent.java   |   2 +
 .../runtime/api/events/InputFailedEvent.java    |  17 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  31 +-
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |   4 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   1 +
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  21 +-
 .../app/dag/impl/ScatterGatherEdgeManager.java  |   9 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  27 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   7 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  14 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   |  43 ++-
 .../shuffle/impl/ShuffleInputEventHandler.java  |   2 +-
 .../org/apache/tez/test/TestFaultTolerance.java | 361 +++++++++++++++++++
 .../java/org/apache/tez/test/TestInput.java     | 276 ++++++++++++++
 .../java/org/apache/tez/test/TestOutput.java    |  71 ++++
 .../java/org/apache/tez/test/TestProcessor.java | 155 ++++++++
 17 files changed, 991 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index b02d345..e0cbc93 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import com.google.protobuf.ServiceException;
 
 public class TezSession {
@@ -141,6 +142,10 @@ public class TezSession {
         .createEnvironment(sessionConfig.getYarnConfiguration());
     for (Vertex v : dag.getVertices()) {
       Map<String, String> taskEnv = v.getTaskEnvironment();
+      if (taskEnv == null) {
+        taskEnv = Maps.newHashMap();
+        v.setTaskEnvironment(taskEnv);
+      }
       for (Map.Entry<String, String> entry : environment.entrySet()) {
         String key = entry.getKey();
         String value = entry.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index e5f9b05..9865117 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -66,9 +66,11 @@ public final class DataMovementEvent extends Event {
   @Private
   public DataMovementEvent(int sourceIndex,
       int targetIndex,
+      int version,
       byte[] userPayload) {
     this.userPayload = userPayload;
     this.sourceIndex = sourceIndex;
+    this.version = version;
     this.targetIndex = targetIndex;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index 8469bb8..fb44462 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -33,7 +33,7 @@ public class InputFailedEvent extends Event{
    * Index(i) of the i-th (physical) Input or Output that generated the data.
    * For a Processor-generated event, this is ignored.
    */
-  private final int sourceIndex;
+  private int sourceIndex;
 
   /**
    * Index(i) of the i-th (physical) Input or Output that is meant to receive
@@ -45,17 +45,12 @@ public class InputFailedEvent extends Event{
    * Version number to indicate what attempt generated this Event
    */
   private int version;
-
-  /**
-   * User Event constructor
-   * @param sourceIndex Index to identify the physical edge of the input/output
-   * that generated the event
-   */
-  public InputFailedEvent(int sourceIndex) {
-    this.sourceIndex = sourceIndex;
-  }
-
+  
   @Private
+  public InputFailedEvent() {
+  }
+  
+  @Private // for Writable
   public InputFailedEvent(int sourceIndex,
       int targetIndex,
       int version) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 35e8e62..8463dec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -99,6 +99,7 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -1289,7 +1290,12 @@ public class DAGAppMaster extends AbstractService {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(DAGEvent event) {
-      ((EventHandler<DAGEvent>)context.getCurrentDAG()).handle(event);
+      DAG dag = context.getCurrentDAG();
+      int eventDagIndex = event.getDAGId().getId();
+      if (dag == null || eventDagIndex != dag.getID().getId()) {
+        return; // event not relevant any more
+      }
+      ((EventHandler<DAGEvent>)dag).handle(event);
     }
   }
 
@@ -1297,8 +1303,15 @@ public class DAGAppMaster extends AbstractService {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskEvent event) {
+      DAG dag = context.getCurrentDAG();
+      int eventDagIndex = 
+          event.getTaskID().getVertexID().getDAGId().getId();
+      if (dag == null || eventDagIndex != dag.getID().getId()) {
+        return; // event not relevant any more
+      }
+      
       Task task =
-          context.getCurrentDAG().getVertex(event.getTaskID().getVertexID()).
+          dag.getVertex(event.getTaskID().getVertexID()).
               getTask(event.getTaskID());
       ((EventHandler<TaskEvent>)task).handle(event);
     }
@@ -1310,6 +1323,11 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(TaskAttemptEvent event) {
       DAG dag = context.getCurrentDAG();
+      int eventDagIndex = 
+          event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+      if (dag == null || eventDagIndex != dag.getID().getId()) {
+        return; // event not relevant any more
+      }
       Task task =
           dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
               getTask(event.getTaskAttemptID().getTaskID());
@@ -1324,7 +1342,13 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(VertexEvent event) {
       DAG dag = context.getCurrentDAG();
-      org.apache.tez.dag.app.dag.Vertex vertex =
+      int eventDagIndex = 
+          event.getVertexId().getDAGId().getId();
+      if (dag == null || eventDagIndex != dag.getID().getId()) {
+        return; // event not relevant any more
+      }
+      
+      Vertex vertex =
           dag.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
@@ -1476,6 +1500,7 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
+    LOG.info("Running DAG: " + dagPlan.getName());
     // Job name is the same as the app name until we support multiple dags
     // for an app later
     appName = dagPlan.getName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 40a8872..5e5314e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -65,9 +65,9 @@ public class BroadcastEdgeManager implements EdgeManager {
     return event.getIndex();
   }
   
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndeces) {
+  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices) {
     for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndeces.add(new Integer(i));
+      taskIndices.add(new Integer(i));
     }    
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d116b46..16ae914 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -879,6 +879,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     
     eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
 
+    LOG.info("DAG: " + getID() + " finished with state: " + finalState);
     return finalState;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 7637d5f..e8ec81a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -243,17 +242,17 @@ public class Edge {
               sourceTaskIndex, destinationVertex.getTotalTasks(),
               destTaskIndices);
         }
+        EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
+            destinationVertex.getName(), 
+            sourceVertex.getName(), 
+            null);
+        if (isDataMovementEvent) {
+          destMeta.setIndex(((DataMovementEvent)event).getTargetIndex());
+        } else {
+          destMeta.setIndex(((InputFailedEvent)event).getTargetIndex());
+        }
+        tezEvent.setDestinationInfo(destMeta);
         for(Integer destTaskIndex : destTaskIndices) {
-          EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
-              destinationVertex.getName(), 
-              sourceVertex.getName(), 
-              null); // will be filled by Task when sending the event. Is it needed?
-          if (isDataMovementEvent) {
-            destMeta.setIndex(((DataMovementEvent)event).getTargetIndex());
-          } else {
-            destMeta.setIndex(((InputFailedEvent)event).getTargetIndex());
-          }
-          tezEvent.setDestinationInfo(destMeta);
           Task destTask = destinationVertex.getTask(destTaskIndex);
           if (destTask == null) {
             throw new TezUncheckedException("Unexpected null task." +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 230e987..c825a0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -55,9 +55,8 @@ public class ScatterGatherEdgeManager implements EdgeManager {
   @Override
   public void routeEventToDestinationTasks(InputFailedEvent event,
       int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    int destinationTaskIndex = event.getSourceIndex();
     event.setTargetIndex(sourceTaskIndex);
-    taskIndices.add(new Integer(destinationTaskIndex));
+    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
   }
 
   @Override
@@ -71,4 +70,10 @@ public class ScatterGatherEdgeManager implements EdgeManager {
     return numDestTasks;
   }
   
+  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices) {
+    for(int i=0; i<numDestinationTasks; ++i) {
+      taskIndices.add(new Integer(i));
+    }    
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index dc806d7..b02d51f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -79,6 +80,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -89,12 +91,16 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 public class TaskAttemptImpl implements TaskAttempt,
     EventHandler<TaskAttemptEvent> {
@@ -138,7 +144,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   
   private Set<TezTaskAttemptID> uniquefailedOutputReports = 
       new HashSet<TezTaskAttemptID>();
-  private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.5;
+  private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.25;
 
   protected final TaskLocationHint locationHint;
   protected final boolean isRescheduled;
@@ -508,7 +514,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (oldState != getInternalState()) {
           LOG.info(attemptId + " TaskAttempt Transitioned from "
            + oldState + " to "
-           + getInternalState());
+           + getInternalState() + " due to event "
+           + event.getType());
       }
     } finally {
       writeLock.unlock();
@@ -1203,14 +1210,28 @@ public class TaskAttemptImpl implements TaskAttempt,
       
       // If needed we can also use the absolute number of reported output errors
       // If needed we can launch a background task without failing this task
-      // If needed we can consider only running consumer tasks
       // to generate a copy of the output just in case.
+      // If needed we can consider only running consumer tasks
       if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
         return attempt.getInternalState();
       }
       String message = attempt.getID() + " being failed for too many output errors";
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
+      // send input failed event
+      Vertex vertex = attempt.getVertex();
+      Map<Vertex, Edge> edges = vertex.getOutputVertices();
+      if (edges != null && !edges.isEmpty()) {
+        List<TezEvent> tezIfEvents = Lists.newArrayListWithCapacity(edges.size());
+        for (Vertex edgeVertex : edges.keySet()) {
+          tezIfEvents.add(new TezEvent(new InputFailedEvent(), 
+              new EventMetaData(EventProducerConsumerType.SYSTEM, 
+                  vertex.getName(), 
+                  edgeVertex.getName(), 
+                  attempt.getID())));
+        }
+        attempt.sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
+      }
       // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
       if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
         (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 48bb55b..8d1826d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -341,6 +341,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               new TaskRescheduledAfterVertexSuccessTransition())
 
           // Ignore-able events
+          .addTransition(
+              VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+              // accumulate these in case we get restarted
+              VertexEventType.V_ROUTE_EVENT,
+              ROUTE_EVENT_TRANSITION)
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -349,8 +354,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // We should have been in RUNNING state if we had triggered the
                   // reruns.
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
-                  // accumulate these in case we get restarted
-                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_COMPLETED))
 
           // Transitions from FAILED state

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 87a43fc..9ae5e22 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -65,6 +65,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
@@ -917,21 +918,21 @@ public class TestTaskAttempt {
     TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
     TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 3));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
     
     // failure threshold not met. state is SUCCEEDED
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
     
     // sending same error again doesnt change anything
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 3));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
 
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);    
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 3));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
     
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
@@ -993,6 +994,13 @@ public class TestTaskAttempt {
           clock, taskHeartbeatHandler, appContext,
           locationHint, isRescheduled, resource, containerContext, leafVertex);
     }
+    
+    Vertex mockVertex = mock(Vertex.class);
+    
+    @Override
+    protected Vertex getVertex() {
+      return mockVertex;
+    }
 
     @Override
     protected TaskSpec createRemoteTaskSpec() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 1b0f45c..af43d9a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -121,24 +121,29 @@ public class TezEvent implements Writable {
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
         DataMovementEvent dmEvt = (DataMovementEvent) event;
-        eventBytes = DataMovementEventProto.newBuilder()
-          .setSourceIndex(dmEvt.getSourceIndex())
-          .setTargetIndex(dmEvt.getTargetIndex())
-          .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
-          .build().toByteArray();
+        DataMovementEventProto.Builder dmBuilder = DataMovementEventProto.newBuilder();
+        dmBuilder.setSourceIndex(dmEvt.getSourceIndex()).
+        setTargetIndex(dmEvt.getTargetIndex()).setVersion(dmEvt.getVersion());
+        if (dmEvt.getUserPayload() != null) {
+          dmBuilder.setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()));
+        }
+        eventBytes = dmBuilder.build().toByteArray();
         break;
       case VERTEX_MANAGER_EVENT:
         VertexManagerEvent vmEvt = (VertexManagerEvent) event;
-        eventBytes = VertexManagerEventProto.newBuilder()
-          .setTargetVertexName(vmEvt.getTargetVertexName())
-          .setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()))
-          .build().toByteArray();
+        VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
+        vmBuilder.setTargetVertexName(vmEvt.getTargetVertexName());
+        if (vmEvt.getUserPayload() != null) {
+          vmBuilder.setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()));
+        }
+        eventBytes = vmBuilder.build().toByteArray();
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
         eventBytes = InputReadErrorEventProto.newBuilder()
             .setIndex(ideEvt.getIndex())
             .setDiagnostics(ideEvt.getDiagnostics())
+            .setVersion(ideEvt.getVersion())
             .build().toByteArray();
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
@@ -160,10 +165,13 @@ public class TezEvent implements Writable {
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEvent liEvent = (RootInputDataInformationEvent) event;
-        eventBytes = RootInputDataInformationEventProto.newBuilder()
-            .setIndex(liEvent.getIndex())
-            .setUserPayload(ByteString.copyFrom(liEvent.getUserPayload()))
-            .build().toByteArray();
+        RootInputDataInformationEventProto.Builder riBuilder =
+            RootInputDataInformationEventProto.newBuilder();
+        riBuilder.setIndex(liEvent.getIndex());
+        if (liEvent.getUserPayload() != null) {
+          riBuilder.setUserPayload(ByteString.copyFrom(liEvent.getUserPayload()));
+        }
+        eventBytes = riBuilder.build().toByteArray();
         break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
@@ -194,13 +202,14 @@ public class TezEvent implements Writable {
             DataMovementEventProto.parseFrom(eventBytes);
         event = new DataMovementEvent(dmProto.getSourceIndex(),
             dmProto.getTargetIndex(),
-            dmProto.getUserPayload().toByteArray());
+            dmProto.getVersion(),
+            dmProto.getUserPayload() != null ? dmProto.getUserPayload().toByteArray() : null);
         break;
       case VERTEX_MANAGER_EVENT:
         VertexManagerEventProto vmProto =
             VertexManagerEventProto.parseFrom(eventBytes);
         event = new VertexManagerEvent(vmProto.getTargetVertexName(),
-            vmProto.getUserPayload().toByteArray());
+            vmProto.getUserPayload() != null ? vmProto.getUserPayload().toByteArray() : null);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =
@@ -225,8 +234,8 @@ public class TezEvent implements Writable {
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
             .parseFrom(eventBytes);
-        event = new RootInputDataInformationEvent(difProto.getIndex(), difProto
-            .getUserPayload().toByteArray());
+        event = new RootInputDataInformationEvent(difProto.getIndex(), 
+            difProto.getUserPayload() != null ? difProto.getUserPayload().toByteArray() : null);
         break;
       default:
         // RootInputUpdatePayload event not wrapped in a TezEvent.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 8fae1c1..3319752 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -87,7 +87,7 @@ public class ShuffleInputEventHandler {
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
-    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
     scheduler.obsoleteMapOutput(taIdentifier);
     LOG.info("Obsoleting output of src-task: " + taIdentifier);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
new file mode 100644
index 0000000..3a43eca
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFaultTolerance {
+  private static final Log LOG = LogFactory.getLog(TestFaultTolerance.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster;
+  private static FileSystem remoteFs;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestFaultTolerance.class.getName() + "-tmpDir";
+  
+  private static Resource defaultResource = Resource.newInstance(100, 0);
+  private static TezSession tezSession = null;
+  
+  @BeforeClass
+  public static void setup() throws Exception {
+    LOG.info("Starting mini clusters");
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestFaultTolerance.class.getName(),
+          4, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      remoteFs = new RawLocalFileSystem();
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+      
+      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+          .valueOf(new Random().nextInt(100000))));
+      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+      
+      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          remoteStagingDir.toString());
+
+      AMConfiguration amConfig = new AMConfiguration(
+          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+          tezConf, null);
+      TezSessionConfiguration tezSessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("TestFaultTolerance", tezSessionConfig);
+      tezSession.start();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    LOG.info("Stopping mini clusters");
+    if (tezSession != null) {
+      tezSession.stop();
+    }
+    if (miniTezCluster != null) {
+      miniTezCluster.stop();
+      miniTezCluster = null;
+    }
+    if (remoteFs != null) {
+      remoteFs.close();
+    }
+  }
+  
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    TezSessionStatus status = tezSession.getSessionStatus();
+    while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+      LOG.info("Waiting for session to be ready. Current: " + status);
+      Thread.sleep(100);
+      status = tezSession.getSessionStatus();
+    }
+    if (status == TezSessionStatus.SHUTDOWN) {
+      throw new TezUncheckedException("Unexpected Session shutdown");
+    }
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appId: " + dagClient.getApplicationId()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+  
+  ProcessorDescriptor getProcDesc(byte[] payload) {
+    return new ProcessorDescriptor(TestProcessor.class.getName()).
+        setUserPayload(payload);
+  }
+  
+  InputDescriptor getInputDesc(byte[] payload) {
+    return new InputDescriptor(TestInput.class.getName()).
+        setUserPayload(payload);
+  }
+  
+  OutputDescriptor getOutputDesc(byte[] payload) {
+    return new OutputDescriptor(TestOutput.class.getName()).
+        setUserPayload(payload);
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicSuccessScatterGather() throws Exception {
+    DAG dag = new DAG("testBasicSuccessScatterGather");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(null))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicSuccessBroadcast() throws Exception {
+    DAG dag = new DAG("testBasicSuccessBroadcast");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.BROADCAST, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(null))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicTaskFailure() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testBasicTaskFailure");
+    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(null))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testTaskMultipleFailures() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testTaskMultipleFailures");
+    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(null))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testTaskMultipleFailuresDAGFail() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), -1);
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testTaskMultipleFailuresDAGFail");
+    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(null))));
+    runDAGAndVerify(dag, DAGStatus.State.FAILED);
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicInputFailureWithExit() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testBasicInputFailureWithExit");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(payload))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testBasicInputFailureWithoutExit() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testBasicInputFailureWithoutExit");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(payload))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testMultipleInputFailureWithoutExit() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0,1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testMultipleInputFailureWithoutExit");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(payload))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testMultiVersionInputFailureWithoutExit() throws Exception {
+    Configuration testConf = new Configuration();
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+        TestInput.TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+    
+    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+    
+    DAG dag = new DAG("testMultiVersionInputFailureWithoutExit");
+    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
+    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            getOutputDesc(null), 
+            getInputDesc(payload))));
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
new file mode 100644
index 0000000..5d0ac71
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -0,0 +1,276 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * LogicalInput used to writing tests for Tez. Supports fault
+ * injection based on configuration. All the failure injection happens when the
+ * processor calls doRead() on the input. Thus doRead() blocks until all the
+ * failure injection is completed. The input waits for all the incoming data to
+ * be ready by waiting for DataMovement events. Then it checks if any of those
+ * need to be failed. It fails them by sending InputReadError events. Then it
+ * either exits or waits for the data to be re-generated. When the data is
+ * re-generated, it goes through the above cycle again. When no more failures
+ * are to be injected, then it completes. All configuration items are post-fixed
+ * by the name of the vertex that executes this input.
+ */
+public class TestInput implements LogicalInput {
+  private static final Log LOG = LogFactory
+      .getLog(TestInput.class);
+  
+  Configuration conf;
+  TezInputContext inputContext;
+  int numInputs;
+  int numCompletedInputs = 0;
+  int[] completedInputVersion;
+  AtomicInteger inputReady = new AtomicInteger(-1);
+  int lastInputReadyValue = -1;
+  int failingInputUpto = 0;
+  
+  boolean doFail = false;
+  boolean doFailAndExit = false;
+  Set<Integer> failingTaskIndices = Sets.newHashSet();
+  Set<Integer> failingTaskAttempts = Sets.newHashSet();
+  Set<Integer> failingInputIndices = Sets.newHashSet();
+  Integer failAll = new Integer(-1);
+  
+  /**
+   * Enable failure for this logical input
+   */
+  public static String TEZ_AM_FAILING_INPUT_DO_FAIL =
+      "tez.am.failing-input.do-fail";
+  /**
+   * Logical input will exit (and cause task failure) after reporting failure to 
+   * read.
+   */
+  public static String TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT =
+      "tez.am.failing-input.do-fail-and-exit";
+  /**
+   * Which physical inputs to fail. This is a comma separated list of +ve integers.
+   * -1 means fail all.
+   */
+  public static String TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX =
+      "tez.am.failing-input.failing-input-index";
+  /**
+   * Up to which version of the above physical inputs to fail. 0 will fail the 
+   * first version. 1 will fail the first and second versions. And so on.
+   */
+  public static String TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT =
+      "tez.am.failing-input.failing-upto-input-attempt";
+  /**
+   * Indices of the tasks in the first for which this input will fail. Comma 
+   * separated list of +ve integers. -1 means all tasks. E.g. 0 means the first
+   * task in the vertex will have failing inputs.
+   */
+  public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX =
+      "tez.am.failing-input.failing-task-index";
+  /**
+   * Which task attempts will fail the input. This is a comma separated list of
+   * +ve integers. -1 means all will fail. E.g. specifying 1 means the first
+   * attempt will not fail the input but a re-run (the second attempt) will
+   * trigger input failure. So this can be used to simulate cascading failures.
+   */
+  public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT =
+      "tez.am.failing-input.failing-task-attempt";
+
+  public int doRead() {
+    boolean done = true;
+    do {
+      done = true;
+      synchronized (inputReady) {
+        while (inputReady.get() <= lastInputReadyValue) {
+          try {
+            LOG.info("Waiting for inputReady: " + inputReady.get() + 
+                " last: " + lastInputReadyValue);
+            inputReady.wait();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        LOG.info("Done for inputReady: " + inputReady.get());
+        lastInputReadyValue = inputReady.get();
+      }
+      if (doFail) {
+        if (
+            (failingTaskIndices.contains(failAll) ||
+            failingTaskIndices.contains(inputContext.getTaskIndex())) &&
+            (failingTaskAttempts.contains(failAll) || 
+             failingTaskAttempts.contains(inputContext.getTaskAttemptNumber())) &&
+             (lastInputReadyValue <= failingInputUpto)) {
+          List<Event> events = Lists.newLinkedList();
+          if (failingInputIndices.contains(failAll)) {
+            for (int i=0; i<numInputs; ++i) {
+              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
+                  " index: " + i + " version: " + lastInputReadyValue);
+              events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue));
+              LOG.info("Failing input: " + msg);
+            }
+          } else {
+            for (Integer index : failingInputIndices) {
+              if (index.intValue() >= numInputs) {
+                throwException("InputIndex: " + index.intValue() + 
+                    " should be less than numInputs: " + numInputs);
+              }
+              if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
+                continue; // dont fail a previous version now.
+              }
+              String msg = ("FailingInput: " + inputContext.getUniqueIdentifier() + 
+                  " index: " + index.intValue() + " version: " + lastInputReadyValue);
+              events.add(new InputReadErrorEvent(msg, index.intValue(), lastInputReadyValue));
+              LOG.info("Failing input: " + msg);
+            }
+          }
+          inputContext.sendEvents(events);
+          if (doFailAndExit) {
+            String msg = "FailingInput exiting: " + inputContext.getUniqueIdentifier();
+            LOG.info(msg);
+            throwException(msg);
+          } else {
+            done = false;
+          }
+        }
+      }
+    } while (!done);
+    return numInputs;
+  }
+  
+  void throwException(String msg) {
+    RuntimeException e = new RuntimeException(msg);
+    inputContext.fatalError(e , msg);
+    throw e;
+  }
+  
+  public static String getVertexConfName(String confName, String vertexName) {
+    return confName + "." + vertexName;
+  }
+  
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    this.inputContext = inputContext;
+    if (inputContext.getUserPayload() != null) {
+      String vName = inputContext.getTaskVertexName();
+      conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
+      doFail = conf.getBoolean(getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL, vName), false);
+      doFailAndExit = conf.getBoolean(
+          getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
+      LOG.info("doFail: " + doFail + " doFailAndExit: " + doFailAndExit);
+      if (doFail) {
+        for (String failingIndex : 
+          conf.getTrimmedStringCollection(
+              getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, vName))) {
+          LOG.info("Adding failing task index: " + failingIndex);
+          failingTaskIndices.add(Integer.valueOf(failingIndex));
+        }
+        for (String failingIndex : 
+          conf.getTrimmedStringCollection(
+              getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, vName))) {
+          LOG.info("Adding failing task attempt: " + failingIndex);
+          failingTaskAttempts.add(Integer.valueOf(failingIndex));
+        }
+        failingInputUpto = conf.getInt(
+            getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, vName), 0);
+        LOG.info("Adding failing input upto: " + failingInputUpto);
+        for (String failingIndex : 
+          conf.getTrimmedStringCollection(
+              getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, vName))) {
+          LOG.info("Adding failing input index: " + failingIndex);
+          failingInputIndices.add(Integer.valueOf(failingIndex));
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Reader getReader() throws Exception {
+    return null;
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws Exception {
+    for (Event event : inputEvents) {
+      if (event instanceof DataMovementEvent) {
+        DataMovementEvent dmEvent = (DataMovementEvent) event;
+        numCompletedInputs++;
+        LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + 
+            " targetId: " + dmEvent.getTargetIndex() +
+            " version: " + dmEvent.getVersion() +
+            " numInputs: " + numInputs +
+            " numCompletedInputs: " + numCompletedInputs);
+        this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
+      } else if (event instanceof InputFailedEvent) {
+        InputFailedEvent ifEvent = (InputFailedEvent) event;
+        numCompletedInputs--;
+        LOG.info("Received InputFailed event sourceId : " + ifEvent.getSourceIndex() + 
+            " targetId: " + ifEvent.getTargetIndex() +
+            " version: " + ifEvent.getVersion() +
+            " numInputs: " + numInputs +
+            " numCompletedInputs: " + numCompletedInputs);
+      }
+    }
+    if (numCompletedInputs == numInputs) {
+      for (int i=0; i<numInputs; ++i) {
+        if (completedInputVersion[i] < 0) {
+          LOG.info("Not received completion for input " + i);
+          return;
+        }
+      }
+      LOG.info("Received all inputs");
+      synchronized (inputReady) {
+        int newVal = inputReady.incrementAndGet();
+        LOG.info("Notifying done with " + newVal);
+        inputReady.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    return null;
+  }
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
+    this.completedInputVersion = new int[numInputs];
+    for (int i=0; i<numInputs; ++i) {
+      this.completedInputVersion[i] = -1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
new file mode 100644
index 0000000..bb4f7b5
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+
+import com.google.common.collect.Lists;
+
+public class TestOutput implements LogicalOutput {
+  private static final Log LOG = LogFactory.getLog(TestOutput.class);
+  
+  int numOutputs;
+  TezOutputContext outputContext;
+  
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws Exception {
+    this.outputContext = outputContext;
+    return null;
+  }
+
+  @Override
+  public Writer getWriter() throws Exception {
+    return null;
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    LOG.info("Sending data movement event");
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+    for (int i = 0; i < numOutputs; i++) {
+      DataMovementEvent event = new DataMovementEvent(i, null);
+      events.add(event);
+    }
+    return events;
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25c32021/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
new file mode 100644
index 0000000..d539585
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -0,0 +1,155 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezTaskContext;
+
+import com.google.common.collect.Sets;
+
+/**
+ * LogicalIOProcessor used to write tests. Supports fault injection through
+ * configuration. The configuration is post-fixed by the name of the vertex for
+ * this processor. The fault injection executes in the run() method. The
+ * processor first sleeps for a specified interval. Then checks if it needs to
+ * fail. It fails and exits if configured to do so. If not, then it calls
+ * doRead() on all inputs to let them fail.
+ */
+public class TestProcessor implements LogicalIOProcessor {
+  private static final Log LOG = LogFactory
+      .getLog(TestProcessor.class);
+  
+  Configuration conf;
+  TezTaskContext processorContext;
+  
+  boolean doFail = false;
+  long sleepMs;
+  Set<Integer> failingTaskIndices = Sets.newHashSet();
+  int failingTaskAttemptUpto = 0;
+  Integer failAll = new Integer(-1);
+  
+  /**
+   * Enable failure for this processor
+   */
+  public static String TEZ_AM_FAILING_PROCESSOR_DO_FAIL =
+      "tez.am.failing-processor.do-fail";
+  /**
+   * Time to sleep in the processor in milliseconds.
+   */
+  public static String TEZ_AM_FAILING_PROCESSOR_SLEEP_MS =
+      "tez.am.failing-processor.sleep-ms";
+  /**
+   * The indices of tasks in the vertex for which the processor will fail. This 
+   * is a comma-separated list of +ve integeres. -1 means all fail.
+   */
+  public static String TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX =
+      "tez.am.failing-processor.failing-task-index";
+  /**
+   * Up to which attempt of the tasks will fail. Specifying 0 means the first
+   * attempt will fail. 1 means first and second attempt will fail. And so on.
+   */
+  public static String TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
+      "tez.am.failing-processor.failing-upto-task-attempt";
+
+  void throwException(String msg) {
+    RuntimeException e = new RuntimeException(msg);
+    processorContext.fatalError(e , msg);
+    throw e;
+  }
+
+  public static String getVertexConfName(String confName, String vertexName) {
+    return confName + "." + vertexName;
+  }
+  
+  @Override
+  public void initialize(TezProcessorContext processorContext) throws Exception {
+    this.processorContext = processorContext;
+    if (processorContext.getUserPayload() != null) {
+      String vName = processorContext.getTaskVertexName();
+      conf = MRHelpers.createConfFromUserPayload(processorContext
+          .getUserPayload());
+      doFail = conf.getBoolean(
+          getVertexConfName(TEZ_AM_FAILING_PROCESSOR_DO_FAIL, vName), false);
+      sleepMs = conf.getLong(
+          getVertexConfName(TEZ_AM_FAILING_PROCESSOR_SLEEP_MS, vName), 0);
+      LOG.info("doFail: " + doFail);
+      if (doFail) {
+        for (String failingIndex : conf
+            .getTrimmedStringCollection(
+                getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
+          LOG.info("Adding failing task index: " + failingIndex);
+          failingTaskIndices.add(Integer.valueOf(failingIndex));
+        }
+        failingTaskAttemptUpto = conf.getInt(
+            getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
+        LOG.info("Adding failing attempt : " + failingTaskAttemptUpto + 
+            " dag: " + processorContext.getDAGName());
+      }
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+    LOG.info("Sleeping ms: " + sleepMs);
+    Thread.sleep(sleepMs);
+    
+    if (doFail) {
+      if (
+          (failingTaskIndices.contains(failAll) ||
+          failingTaskIndices.contains(processorContext.getTaskIndex())) &&
+          (failingTaskAttemptUpto == failAll.intValue() || 
+           failingTaskAttemptUpto >= processorContext.getTaskAttemptNumber())) {
+        String msg = "FailingProcessor: " + processorContext.getUniqueIdentifier() + 
+            " dag: " + processorContext.getDAGName() +
+            " taskIndex: " + processorContext.getTaskIndex() +
+            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        LOG.info(msg);
+        throwException(msg);
+      }
+    }
+    
+    for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) {
+      LOG.info("Reading input: " + entry.getKey());
+      TestInput input = (TestInput) entry.getValue();
+      input.doRead();
+    }
+  }
+
+}


Mime
View raw message