tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1131. Simplify EdgeManager APIs
Date Wed, 11 Jun 2014 01:25:16 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 719db0fd0 -> bd0e6bd2c


TEZ-1131. Simplify EdgeManager APIs


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bd0e6bd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bd0e6bd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bd0e6bd2

Branch: refs/heads/master
Commit: bd0e6bd2cdf9b49630375006c14e3f5fc5d25cb6
Parents: 719db0f
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Jun 10 18:25:08 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Jun 10 18:25:08 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/dag/api/EdgeManager.java     |  50 ++++----
 .../apache/tez/dag/api/EdgeManagerContext.java  |  14 ++-
 .../runtime/api/events/InputFailedEvent.java    |  19 ---
 tez-api/src/main/proto/Events.proto             |   5 +-
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |  49 ++++----
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 122 ++++++++++---------
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  24 ++--
 .../app/dag/impl/ScatterGatherEdgeManager.java  |  40 +++---
 .../org/apache/tez/test/EdgeManagerForTest.java |  11 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   |   4 +-
 .../vertexmanager/ShuffleVertexManager.java     |  65 +++++-----
 .../src/main/proto/ShufflePayloads.proto        |   5 +-
 .../vertexmanager/TestShuffleVertexManager.java |  34 ++++--
 .../java/org/apache/tez/test/TestInput.java     |   3 +-
 15 files changed, 223 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d096414..5fe251e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-1018. VertexManagerPluginContext should enable assigning locality to
   scheduled tasks
   TEZ-1169. Allow numPhysicalInputs to be specified for RootInputs.
+  TEZ-1131. Simplify EdgeManager APIs
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
index e7beec1..a3970fc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
@@ -52,69 +52,63 @@ public interface EdgeManager {
   
   /**
    * Get the number of physical inputs on the destination task
-   * @param numSourceTasks Total number of source tasks
    * @param destinationTaskIndex Index of destination task for which number of 
    * inputs is needed
    * @return Number of physical inputs on the destination task
    */
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
-      int destinationTaskIndex);
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex);
 
   /**
    * Get the number of physical outputs on the source task
-   * @param numDestinationTasks Total number of destination tasks
    * @param sourceTaskIndex Index of the source task for which number of outputs 
    * is needed
    * @return Number of physical outputs on the source task
    */
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
-      int sourceTaskIndex);
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex);
   
   /**
    * Return the routing information to inform consumers about the source task
-   * output that is now available. The return Map has the routing information.
-   * Key is the destination task physical input index and the value is the list
-   * of destination task indices for which the key input index will receive the
-   * data movement event.
+   * output that is now available. The return map has the routing information.
+   * The event will be routed to every destination task index in the key of the
+   * map. Every physical input in the value for that task key will receive the
+   * input.
+   * 
    * @param event
-   *          Data movement event
+   *          Data movement event that contains the output information
    * @param sourceTaskIndex
-   *          Source task
-   * @param numDestinationTasks
-   *          Total number of destination tasks
-   * @param inputIndicesToTaskIndices
+   *          Source task that produced the event
+   * @param sourceOutputIndex
+   *          Index of the physical output on the source task that produced the
+   *          event
+   * @param destinationTaskAndInputIndices
    *          Map via which the routing information is returned
    */
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices);
+      int sourceTaskIndex, int sourceOutputIndex,
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices);
   
   /**
    * Return the routing information to inform consumers about the failure of a
-   * source task whose outputs have been potentially lost. The return Map has
-   * the routing information. Key is the destination task physical input index
-   * and the value is the list of destination task indices for which the key
-   * input index will receive the input failure notification. This method will
+   * source task whose outputs have been potentially lost. The return map has
+   * the routing information. The failure notification event will be sent to
+   * every task index in the key of the map. Every physical input in the value
+   * for that task key will receive the failure notification. This method will
    * be called once for every source task failure and information for all
    * affected destinations must be provided in that invocation.
    * 
    * @param sourceTaskIndex
    *          Source task
-   * @param numDestinationTasks
-   *          Total number of destination tasks
-   * @param inputIndicesToTaskIndices
+   * @param destinationTaskAndInputIndices
    *          Map via which the routing information is returned
    */
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices);
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices);
 
   /**
    * Get the number of destination tasks that consume data from the source task
    * @param sourceTaskIndex Source task index
-   * @param numDestinationTasks Total number of destination tasks
    */
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks);
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex);
   
   /**
    * Return the source task index to which to send the input error event

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
index f2f9236..4158647 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
@@ -34,12 +34,22 @@ public interface EdgeManagerContext {
    * Returns the source vertex name 
    * @return the source vertex name
    */
-  public String getSrcVertexName();
+  public String getSourceVertexName();
   
   /**
    * Returns the destination vertex name
    * @return the destination vertex name
    */
-  public String getDestVertexName();
+  public String getDestinationVertexName();
+
+  /**
+   * Returns the number of tasks in the source vertex
+   */
+  public int getSourceVertexNumTasks();
+
+  /**
+   * Returns the number of tasks in the destination vertex
+   */
+  public int getDestinationVertexNumTasks();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index a4648f3..2dc1adf 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -30,12 +30,6 @@ import org.apache.tez.runtime.api.Event;
 public class InputFailedEvent extends Event{
 
   /**
-   * Index(i) of the i-th (physical) Input or Output that generated the data.
-   * For a Processor-generated event, this is ignored.
-   */
-  private int sourceIndex;
-
-  /**
    * Index(i) of the i-th (physical) Input or Output that is meant to receive
    * this Event. For a Processor event, this is ignored.
    */
@@ -51,24 +45,11 @@ public class InputFailedEvent extends Event{
   }
   
   @Private
-  public InputFailedEvent(int sourceIndex,
-      int targetIndex,
-      int version) {
-    this.sourceIndex = sourceIndex;
-    this.targetIndex = targetIndex;
-    this.version = version;
-  }
-  
-  @Private
   public InputFailedEvent(int targetIndex, int version) {
     this.targetIndex = targetIndex;
     this.version = version;
   }
 
-  public int getSourceIndex() {
-    return sourceIndex;
-  }
-
   public int getTargetIndex() {
     return targetIndex;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index 19d1978..ab4525e 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -34,9 +34,8 @@ message InputReadErrorEventProto {
 }
 
 message InputFailedEventProto {
-  optional int32 source_index = 1;
-  optional int32 target_index = 2;
-  optional int32 version = 4;
+  optional int32 target_index = 1;
+  optional int32 version = 2;
 }
 
 message VertexManagerEventProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 305e085..6628af5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -26,42 +27,45 @@ import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Lists;
-
 public class BroadcastEdgeManager implements EdgeManager {
 
+  EdgeManagerContext context;
   @Override
   public void initialize(EdgeManagerContext edgeManagerContext) {
-    // Nothing to do.
+    this.context = edgeManagerContext;
   }
   
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
-      int destinationTaskIndex) {
-    return numSourceTasks;
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return context.getSourceVertexNumTasks();
   }
   
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
-      int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return 1;
   }
   
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>>
inputIndicesToTaskIndices) {
-    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
-    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
-    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
+      int sourceTaskIndex, int sourceOutputIndex, 
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    List<Integer> inputIndices = 
+        Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex));
+    // for each task make the i-th source task as the i-th physical input
+    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+      destinationTaskAndInputIndices.put(i, inputIndices);
+    }
   }
   
   @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
-    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
-    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
-    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    List<Integer> inputIndices = 
+        Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex));
+    // for each task make the i-th source task as the i-th physical input
+    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+      destinationTaskAndInputIndices.put(i, inputIndices);
+    }
   }
 
   @Override
@@ -70,16 +74,9 @@ public class BroadcastEdgeManager implements EdgeManager {
     return event.getIndex();
   }
   
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices)
{
-    for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndices.add(new Integer(i));
-    }    
-  }
-
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex,
-      int numDestTasks) {
-    return numDestTasks;
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return context.getDestinationVertexNumTasks();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index b6dfaa4..c4d9c70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -54,22 +54,14 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 public class Edge {
 
-  static class EdgeManagerContextImpl implements EdgeManagerContext {
+  class EdgeManagerContextImpl implements EdgeManagerContext {
 
-    private final String srcVertexName;
-    private final String destVertexName;
     private final TezUserPayload userPayload;
 
-    EdgeManagerContextImpl(String srcVertexName, String destVertexName,
-        @Nullable byte[] userPayload) {
-      checkNotNull(srcVertexName, "srcVertexName is null");
-      checkNotNull(destVertexName, "destVertexName is null");
-      this.srcVertexName = srcVertexName;
-      this.destVertexName = destVertexName;
+    EdgeManagerContextImpl(@Nullable byte[] userPayload) {
       this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     }
 
@@ -79,14 +71,25 @@ public class Edge {
     }
 
     @Override
-    public String getSrcVertexName() {
-      return srcVertexName;
+    public String getSourceVertexName() {
+      return sourceVertex.getName();
     }
 
     @Override
-    public String getDestVertexName() {
-      return destVertexName;
+    public String getDestinationVertexName() {
+      return destinationVertex.getName();
     }
+    
+    @Override
+    public int getSourceVertexNumTasks() {
+      return sourceVertex.getTotalTasks();
+    }
+
+    @Override
+    public int getDestinationVertexNumTasks() {
+      return destinationVertex.getTotalTasks();
+    }
+
   }
 
   private EdgeProperty edgeProperty;
@@ -140,8 +143,7 @@ public class Edge {
         bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
       }
     }
-    edgeManagerContext = new EdgeManagerContextImpl(sourceVertex.getName(),
-        destinationVertex.getName(), bb);
+    edgeManagerContext = new EdgeManagerContextImpl(bb);
     if (edgeManager != null) {
       edgeManager.initialize(edgeManagerContext);
     }
@@ -198,8 +200,7 @@ public class Edge {
         "Edge Manager must be initialized by this time");
     return new InputSpec(sourceVertex.getName(),
         edgeProperty.getEdgeDestination(),
-        edgeManager.getNumDestinationTaskPhysicalInputs(sourceVertex.getTotalTasks(),
-            destinationTaskIndex));
+        edgeManager.getNumDestinationTaskPhysicalInputs(destinationTaskIndex));
   }
 
   public OutputSpec getSourceSpec(int sourceTaskIndex) {
@@ -207,7 +208,7 @@ public class Edge {
         "Edge Manager must be initialized by this time");
     return new OutputSpec(destinationVertex.getName(),
         edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
-        destinationVertex.getTotalTasks(), sourceTaskIndex));
+        sourceTaskIndex));
   }
   
   public void startEventBuffering() {
@@ -240,7 +241,7 @@ public class Edge {
         int srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
             destTaskIndex);
         int numConsumers = edgeManager.getNumDestinationConsumerTasks(
-            srcTaskIndex, destinationVertex.getTotalTasks());
+            srcTaskIndex);
         Task srcTask = sourceVertex.getTask(srcTaskIndex);
         if (srcTask == null) {
           throw new TezUncheckedException("Unexpected null task." +
@@ -279,46 +280,57 @@ public class Edge {
   
   void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex,
       boolean isDataMovementEvent,
-      Map<Integer, List<Integer>> ifInputIndicesToTaskIndices) {
+      Map<Integer, List<Integer>> taskAndInputIndices) {    
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
-    int num = 0;
     Event event = tezEvent.getEvent();
-    for (Map.Entry<Integer, List<Integer>> entry : ifInputIndicesToTaskIndices.entrySet())
{
-      ++num;
-      TezEvent tezEventToSend = null;
-      if (num == ifInputIndicesToTaskIndices.size()) {
-        if (isDataMovementEvent) {
-          ((DataMovementEvent) event).setTargetIndex(entry.getKey().intValue());
-        } else {
-          ((InputFailedEvent) event).setTargetIndex(entry.getKey().intValue());
-        }
-        tezEventToSend = tezEvent;
-      } else {
-        Event e;
-        if (isDataMovementEvent) {
-          DataMovementEvent dmEvent = (DataMovementEvent) event;
-          e = new DataMovementEvent(dmEvent.getSourceIndex(), 
-              entry.getKey().intValue(), dmEvent.getVersion(), dmEvent.getUserPayload());
-        } else {
-          InputFailedEvent ifEvent = ((InputFailedEvent) event);
-          e = new InputFailedEvent(entry.getKey().intValue(), ifEvent.getVersion());
+    boolean isFirstEvent = true;
+    // cache of event object per input index
+    Map<Integer, TezEvent> inputIndicesWithEvents = Maps.newHashMap(); 
+    for (Map.Entry<Integer, List<Integer>> entry : taskAndInputIndices.entrySet())
{
+      int destTaskIndex = entry.getKey();
+      List<Integer> inputIndices = entry.getValue();
+      for(int i=0; i<inputIndices.size(); ++i) {
+        Integer inputIndex = inputIndices.get(i);
+        TezEvent tezEventToSend = inputIndicesWithEvents.get(inputIndex);
+        if (tezEventToSend == null) {
+          if (isFirstEvent) {
+            isFirstEvent = false;
+            // this is the first item - reuse the event object
+            if (isDataMovementEvent) {
+              ((DataMovementEvent) event).setTargetIndex(inputIndex);
+            } else {
+              ((InputFailedEvent) event).setTargetIndex(inputIndex);
+            }
+            tezEventToSend = tezEvent;
+          } else {
+            // create new event object for this input index
+            Event e;
+            if (isDataMovementEvent) {
+              DataMovementEvent dmEvent = (DataMovementEvent) event;
+              e = new DataMovementEvent(dmEvent.getSourceIndex(), 
+                  inputIndex, dmEvent.getVersion(), dmEvent.getUserPayload());
+            } else {
+              InputFailedEvent ifEvent = ((InputFailedEvent) event);
+              e = new InputFailedEvent(inputIndex, ifEvent.getVersion());
+            }
+            tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+          }
+          tezEventToSend.setDestinationInfo(destinationMetaInfo);
+          // cache the event object per input because are unique per input index
+          inputIndicesWithEvents.put(inputIndex, tezEventToSend);
         }
-        tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
-      }
-      tezEventToSend.setDestinationInfo(destinationMetaInfo);
-      for(Integer destTaskIndex : entry.getValue()) {
         Task destTask = destinationVertex.getTask(destTaskIndex);
         if (destTask == null) {
           throw new TezUncheckedException("Unexpected null task." +
               " sourceVertex=" + sourceVertex.getVertexId() +
-              " srcIndex = " + srcTaskIndex +
-              " destAttemptId=" + destinationVertex.getVertexId() +
-              " destIndex=" + destTaskIndex + 
+              " srcTaskIndex = " + srcTaskIndex +
+              " destVertex=" + destinationVertex.getVertexId() +
+              " destTaskIndex=" + destTaskIndex + 
               " edgeManager=" + edgeManager.getClass().getName());
         }
         TezTaskID destTaskId = destTask.getTaskId();
-        sendEventToTask(destTaskId, tezEventToSend);
+        sendEventToTask(destTaskId, tezEventToSend);      
       }
     }
   }
@@ -336,7 +348,7 @@ public class Edge {
         isDataMovementEvent = false;
         // fall through
       case DATA_MOVEMENT_EVENT:
-        Map<Integer, List<Integer>> inputIndicesToTaskIndices = Maps
+        Map<Integer, List<Integer>> destTaskAndInputIndices = Maps
         .newHashMap();
         TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo()
             .getTaskAttemptID();
@@ -344,14 +356,14 @@ public class Edge {
         if (isDataMovementEvent) {
           DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
           edgeManager.routeDataMovementEventToDestination(dmEvent,
-                srcTaskIndex, destinationVertex.getTotalTasks(),
-                inputIndicesToTaskIndices);
+                srcTaskIndex, dmEvent.getSourceIndex(),
+                destTaskAndInputIndices);
         } else {
           edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex,
-              destinationVertex.getTotalTasks(), inputIndicesToTaskIndices);
+              destTaskAndInputIndices);
         }
-        if (!inputIndicesToTaskIndices.isEmpty()) {
-          sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, inputIndicesToTaskIndices);
+        if (!destTaskAndInputIndices.isEmpty()) {
+          sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, destTaskAndInputIndices);
         } else {
           throw new TezUncheckedException("Event must be routed." +
               " sourceVertex=" + sourceVertex.getVertexId() +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 1de25b9..20b2068 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -29,37 +29,35 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class OneToOneEdgeManager implements EdgeManager {
 
+  List<Integer> destinationInputIndices = 
+      Collections.unmodifiableList(Collections.singletonList(0)); 
+
   @Override
   public void initialize(EdgeManagerContext edgeManagerContext) {
     // Nothing to do.
   }
 
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numDestinationTasks, 
-      int destinationTaskIndex) {
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
     return 1;
   }
   
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
-      int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return 1;
   }
   
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, 
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
-    inputIndicesToTaskIndices.put(new Integer(0), 
-        Collections.singletonList(new Integer(sourceTaskIndex)));
+      int sourceTaskIndex, int sourceOutputIndex, 
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
   }
   
   @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
-    inputIndicesToTaskIndices.put(new Integer(0), 
-        Collections.singletonList(new Integer(sourceTaskIndex)));
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
   }
 
   @Override
@@ -69,7 +67,7 @@ public class OneToOneEdgeManager implements EdgeManager {
   }
   
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
     return 1;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 9f4d61e..541f4dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -27,40 +27,38 @@ import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Lists;
-
 public class ScatterGatherEdgeManager implements EdgeManager {
 
+  EdgeManagerContext context;
   @Override
   public void initialize(EdgeManagerContext edgeManagerContext) {
-    // Nothing to do.
+    this.context = edgeManagerContext;
   }
 
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
-      int destinationTaskIndex) {
-    return numSourceTasks;
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return context.getSourceVertexNumTasks();
   }
   
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex)
{
-    return numDestinationTasks;
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
+    return context.getDestinationVertexNumTasks();
   }
 
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>>
inputIndicesToTaskIndices) {
-    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), 
-        Collections.singletonList(new Integer(event.getSourceIndex())));
+      int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>>
destinationTaskAndInputIndices) {
+    // the i-th source output goes to the i-th destination task
+    // the n-th source task becomes the n-th physical input on the task
+    destinationTaskAndInputIndices.put(sourceOutputIndex, Collections.singletonList(sourceTaskIndex));
   }
 
   @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
-    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
-    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
-    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) {
+      destinationTaskAndInputIndices.put(i, Collections.singletonList(sourceTaskIndex));
+    }
   }
 
   @Override
@@ -70,14 +68,8 @@ public class ScatterGatherEdgeManager implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
-    return numDestTasks;
-  }
-  
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices)
{
-    for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndices.add(new Integer(i));
-    }    
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return context.getDestinationVertexNumTasks();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index 24b39fe..93a0a58 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -56,22 +56,22 @@ public class EdgeManagerForTest implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex)
{
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
     return 0;
   }
 
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex)
{
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return 0;
   }
 
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
-      int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices)
{
+      int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices)
{
   }
 
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks)
{
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
     return 0;
   }
 
@@ -82,8 +82,7 @@ public class EdgeManagerForTest implements EdgeManager {
 
   @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) { 
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) { 
   }
   
   // End of overridden methods

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index ecadeb5..29ded25 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -164,7 +164,6 @@ public class TezEvent implements Writable {
       case INPUT_FAILED_EVENT:
         InputFailedEvent ifEvt = (InputFailedEvent) event;
         eventBytes = InputFailedEventProto.newBuilder()
-            .setSourceIndex(ifEvt.getSourceIndex())
             .setTargetIndex(ifEvt.getTargetIndex())
             .setVersion(ifEvt.getVersion()).build().toByteArray();
         break;
@@ -228,8 +227,7 @@ public class TezEvent implements Writable {
       case INPUT_FAILED_EVENT:
         InputFailedEventProto ifProto =
             InputFailedEventProto.parseFrom(eventBytes);
-        event = new InputFailedEvent(ifProto.getSourceIndex(),
-            ifProto.getTargetIndex(), ifProto.getVersion());
+        event = new InputFailedEvent(ifProto.getTargetIndex(), ifProto.getVersion());
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 5b489ed..7c34bad 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -131,6 +131,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     int numDestinationTasks;
     int basePartitionRange;
     int remainderRangeForLastShuffler;
+    int numSourceTasks;
 
     public CustomShuffleEdgeManager() {
     }
@@ -155,11 +156,11 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
       this.numDestinationTasks = config.numDestinationTasks;
       this.basePartitionRange = config.basePartitionRange;
       this.remainderRangeForLastShuffler = config.remainderRangeForLastShuffler;
+      this.numSourceTasks = config.numSourceTasks;
     }
 
     @Override
-    public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
-        int destinationTaskIndex) {
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
       int partitionRange = 1;
       if(destinationTaskIndex < numDestinationTasks-1) {
         partitionRange = basePartitionRange;
@@ -170,14 +171,14 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
-    public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
-        int sourceTaskIndex) {
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
       return numSourceTaskOutputs;
     }
     
     @Override
     public void routeDataMovementEventToDestination(DataMovementEvent event,
-        int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>>
inputIndicesToTaskIndices) {
+        int sourceTaskIndex, int sourceOutputIndex, 
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
       int sourceIndex = event.getSourceIndex();
       int destinationTaskIndex = sourceIndex/basePartitionRange;
       int partitionRange = 1;
@@ -192,39 +193,42 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
           sourceTaskIndex * partitionRange 
           + sourceIndex % partitionRange;
       
-      inputIndicesToTaskIndices.put(new Integer(targetIndex),
-          Collections.singletonList(new Integer(destinationTaskIndex)));
+      destinationTaskAndInputIndices.put(new Integer(destinationTaskIndex),
+          Collections.singletonList(new Integer(targetIndex)));
     }
     
     @Override
     public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, 
-        int numDestinationTasks, 
-        Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
       if (remainderRangeForLastShuffler < basePartitionRange) {
-        List<Integer> lastTask = Collections.singletonList(
-            new Integer(numDestinationTasks-1));
-        List<Integer> otherTasks = Lists.newArrayListWithCapacity(numDestinationTasks-1);
-        for (int i=0; i<numDestinationTasks-1; ++i) {
-          otherTasks.add(new Integer(i));
-        }
-        
         int startOffset = sourceTaskIndex * basePartitionRange;
+        List<Integer> allIndices = Lists.newArrayListWithCapacity(basePartitionRange);
         for (int i=0; i<basePartitionRange; ++i) {
-          inputIndicesToTaskIndices.put(new Integer(startOffset+i), otherTasks);
+          allIndices.add(startOffset + i);
         }
+        List<Integer> inputIndices = Collections.unmodifiableList(allIndices);
+        for (int i=0; i<numDestinationTasks-1; ++i) {
+          destinationTaskAndInputIndices.put(i, inputIndices);
+        }
+        
+        
         startOffset = sourceTaskIndex * remainderRangeForLastShuffler;
+        allIndices = Lists.newArrayListWithCapacity(remainderRangeForLastShuffler);
         for (int i=0; i<remainderRangeForLastShuffler; ++i) {
-          inputIndicesToTaskIndices.put(new Integer(startOffset+i), lastTask);
+          allIndices.add(startOffset+i);
         }
+        inputIndices = Collections.unmodifiableList(allIndices);
+        destinationTaskAndInputIndices.put(numDestinationTasks-1, inputIndices);
       } else {
         // all tasks have same pattern
-        List<Integer> allTasks = Lists.newArrayListWithCapacity(numDestinationTasks);
-        for (int i=0; i<numDestinationTasks; ++i) {
-          allTasks.add(new Integer(i));
-        }
-        int startOffset = sourceTaskIndex * basePartitionRange;
+        int startOffset = sourceTaskIndex * basePartitionRange;        
+        List<Integer> allIndices = Lists.newArrayListWithCapacity(basePartitionRange);
         for (int i=0; i<basePartitionRange; ++i) {
-          inputIndicesToTaskIndices.put(new Integer(startOffset+i), allTasks);
+          allIndices.add(startOffset + i);
+        }
+        List<Integer> inputIndices = Collections.unmodifiableList(allIndices);
+        for (int i=0; i<numDestinationTasks; ++i) {
+          destinationTaskAndInputIndices.put(i, inputIndices);
         }
       }
     }
@@ -242,9 +246,8 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
-    public int getNumDestinationConsumerTasks(int sourceTaskIndex,
-        int numDestTasks) {
-      return numDestTasks;
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+      return numDestinationTasks;
     }
    }
 
@@ -253,15 +256,18 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     int numDestinationTasks;
     int basePartitionRange;
     int remainderRangeForLastShuffler;
+    int numSourceTasks;
 
     private CustomShuffleEdgeManagerConfig(int numSourceTaskOutputs,
         int numDestinationTasks,
+        int numSourceTasks,
         int basePartitionRange,
         int remainderRangeForLastShuffler) {
       this.numSourceTaskOutputs = numSourceTaskOutputs;
       this.numDestinationTasks = numDestinationTasks;
       this.basePartitionRange = basePartitionRange;
       this.remainderRangeForLastShuffler = remainderRangeForLastShuffler;
+      this.numSourceTasks = numSourceTasks;
     }
 
     public byte[] toUserPayload() {
@@ -270,6 +276,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
           .setNumDestinationTasks(numDestinationTasks)
           .setBasePartitionRange(basePartitionRange)
           .setRemainderRangeForLastShuffler(remainderRangeForLastShuffler)
+          .setNumSourceTasks(numSourceTasks)
           .build().toByteArray();
     }
 
@@ -280,6 +287,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
       return new CustomShuffleEdgeManagerConfig(
           proto.getNumSourceTaskOutputs(),
           proto.getNumDestinationTasks(),
+          proto.getNumSourceTasks(),
           proto.getBasePartitionRange(),
           proto.getRemainderRangeForLastShuffler());
 
@@ -418,7 +426,8 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
         // for the source tasks
         CustomShuffleEdgeManagerConfig edgeManagerConfig =
             new CustomShuffleEdgeManagerConfig(
-                currentParallelism, finalTaskParallelism, basePartitionRange,
+                currentParallelism, finalTaskParallelism, 
+                numSourceTasks, basePartitionRange,
                 ((remainderRangeForLastShuffler > 0) ?
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerDescriptor edgeManagerDescriptor =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 9c711bb..2d658ba 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -46,6 +46,7 @@ message VertexManagerEventPayloadProto {
 message ShuffleEdgeManagerConfigPayloadProto {
   optional int32 num_source_task_outputs = 1;
   optional int32 num_destination_tasks = 2;
-  optional int32 base_partition_range = 3;
-  optional int32 remainder_range_for_last_shuffler = 4;
+  optional int32 num_source_tasks = 3;
+  optional int32 base_partition_range = 4;
+  optional int32 remainder_range_for_last_shuffler = 5;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index fce6bc3..fbeba3a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -135,14 +135,24 @@ public class TestShuffleVertexManager {
               }
 
               @Override
-              public String getSrcVertexName() {
+              public String getSourceVertexName() {
                 return null;
               }
 
               @Override
-              public String getDestVertexName() {
+              public String getDestinationVertexName() {
                 return null;
               }
+
+              @Override
+              public int getSourceVertexNumTasks() {
+                return 0;
+              }
+
+              @Override
+              public int getDestinationVertexNumTasks() {
+                return 0;
+              }
             });
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
@@ -231,28 +241,28 @@ public class TestShuffleVertexManager {
     EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
     Map<Integer, List<Integer>> targets = Maps.newHashMap();
     DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
-    edgeManager.routeDataMovementEventToDestination(dmEvent, 1, 2, targets);
+    edgeManager.routeDataMovementEventToDestination(dmEvent, 1, dmEvent.getSourceIndex(),
targets);
     Assert.assertEquals(1, targets.size());
     Map.Entry<Integer, List<Integer>> e = targets.entrySet().iterator().next();
-    Assert.assertEquals(3, e.getKey().intValue());
+    Assert.assertEquals(0, e.getKey().intValue());
     Assert.assertEquals(1, e.getValue().size());
-    Assert.assertEquals(0, e.getValue().get(0).intValue());
+    Assert.assertEquals(3, e.getValue().get(0).intValue());
     targets.clear();
     dmEvent = new DataMovementEvent(2, new byte[0]);
-    edgeManager.routeDataMovementEventToDestination(dmEvent, 0, 2, targets);
+    edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(),
targets);
     Assert.assertEquals(1, targets.size());
     e = targets.entrySet().iterator().next();
-    Assert.assertEquals(0, e.getKey().intValue());
+    Assert.assertEquals(1, e.getKey().intValue());
     Assert.assertEquals(1, e.getValue().size());
-    Assert.assertEquals(1, e.getValue().get(0).intValue());
+    Assert.assertEquals(0, e.getValue().get(0).intValue());
     targets.clear();
-    edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2, targets);
+    edgeManager.routeInputSourceTaskFailedEventToDestination(2, targets);
     Assert.assertEquals(2, targets.size());
     for (Map.Entry<Integer, List<Integer>> entry : targets.entrySet()) {
-      Assert.assertTrue(entry.getKey().intValue() == 4 || entry.getKey().intValue() == 5);
+      Assert.assertTrue(entry.getKey().intValue() == 0 || entry.getKey().intValue() == 1);
       Assert.assertEquals(2, entry.getValue().size());
-      Assert.assertEquals(0, entry.getValue().get(0).intValue());
-      Assert.assertEquals(1, entry.getValue().get(1).intValue());
+      Assert.assertEquals(4, entry.getValue().get(0).intValue());
+      Assert.assertEquals(5, entry.getValue().get(1).intValue());
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd0e6bd2/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index ced91f1..e8c8ed1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -280,8 +280,7 @@ public class TestInput extends AbstractLogicalInput {
       } else if (event instanceof InputFailedEvent) {
         InputFailedEvent ifEvent = (InputFailedEvent) event;
         numCompletedInputs--;
-        LOG.info("Received InputFailed event sourceId : " + ifEvent.getSourceIndex() + 
-            " targetId: " + ifEvent.getTargetIndex() +
+        LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() +
             " version: " + ifEvent.getVersion() +
             " numInputs: " + numPhysicalInputs +
             " numCompletedInputs: " + numCompletedInputs);


Mime
View raw message