tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [48/50] [abbrv] tez git commit: TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case (jeagles)
Date Tue, 06 Dec 2016 17:07:19 GMT
TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/43ca78fe
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/43ca78fe
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/43ca78fe

Branch: refs/heads/TEZ-3334
Commit: 43ca78fea0115e05c8d14626f470010e2f7334c3
Parents: 8079919
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Mon Dec 5 11:13:17 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Mon Dec 5 11:13:17 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/api/EdgeManagerPluginOnDemand.java  |  30 ++++-
 .../api/events/CompositeDataMovementEvent.java  |   6 +
 .../CompositeRoutedDataMovementEvent.java       | 126 ++++++++++++++++++
 tez-api/src/main/proto/Events.proto             |   8 ++
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |   4 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  37 ++----
 .../dag/impl/OneToOneEdgeManagerOnDemand.java   |   7 +-
 .../app/dag/impl/ScatterGatherEdgeManager.java  |   6 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   1 +
 .../tez/dag/app/TestMockDAGAppMaster.java       |   9 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   2 +-
 .../org/apache/tez/test/EdgeManagerForTest.java |   2 +-
 .../org/apache/tez/common/ProtoConverters.java  |  22 ++++
 .../apache/tez/runtime/api/impl/EventType.java  |   1 +
 .../apache/tez/runtime/api/impl/TezEvent.java   |  14 ++
 .../vertexmanager/FairShuffleEdgeManager.java   |   9 +-
 .../vertexmanager/ShuffleVertexManager.java     |   6 +-
 .../CartesianProductEdgeManager.java            |   5 +-
 .../CartesianProductEdgeManagerPartitioned.java |   5 +-
 .../CartesianProductEdgeManagerReal.java        |   6 +-
 ...artesianProductEdgeManagerUnpartitioned.java |   4 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  55 ++++++--
 .../ShuffleInputEventHandlerOrderedGrouped.java |  54 ++++++--
 .../TestFairShuffleVertexManager.java           |  85 ++++++------
 ...tCartesianProductEdgeManagerPartitioned.java |  85 ++++++------
 ...artesianProductEdgeManagerUnpartitioned.java | 131 ++++++++++---------
 .../tez/test/TestExceptionPropagation.java      |   2 +-
 28 files changed, 490 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4603cf..d9a7ca6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case.
   TEZ-3547. Add TaskAssignment Analyzer.
   TEZ-3508. TestTaskScheduler cleanup.
   TEZ-3536. NPE in WebUIService start when host resolution fails.

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
index 05c0c62..3d7f2ab 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
@@ -40,6 +40,34 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 @Unstable
 public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin {
 
+  public static class CompositeEventRouteMetadata {
+    private final int count;
+    private final int target;
+    private final int source;
+
+    public static CompositeEventRouteMetadata create(int count, int target, int source) {
+      return new CompositeEventRouteMetadata(count, target, source);
+    }
+
+    private CompositeEventRouteMetadata(int count, int target, int source) {
+      this.count = count;
+      this.target = target;
+      this.source = source;
+    }
+
+    public int getCount() {
+      return count;
+    }
+
+    public int getTarget() {
+      return target;
+    }
+
+    public int getSource() {
+      return source;
+    }
+  }
+
   /**
    * Class to provide routing metadata for {@link Event}s to be routed between
    * producer and consumer tasks. The routing data enabled the system to send 
@@ -237,7 +265,7 @@ public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin {
    *         source task.
    * @throws Exception
    */
-  public abstract @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public abstract @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex) throws Exception;
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index c45d272..32089a9 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 import org.apache.tez.runtime.api.Event;
 
 /**
@@ -139,4 +140,9 @@ public class CompositeDataMovementEvent extends Event {
     };
   }
 
+  @Private
+  public CompositeRoutedDataMovementEvent expandRouted(CompositeEventRouteMetadata routeMeta) {
+    return CompositeRoutedDataMovementEvent.create(routeMeta.getSource(), routeMeta.getTarget(), routeMeta.getCount(), version, userPayload);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java
new file mode 100644
index 0000000..6dbed71
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.runtime.api.Event;
+
+/**
+ * A convenience class to specify multiple DataMovementEvents which share the same payload.
+ * This event is generated by Edge in Application Master and sent to downstream vertex input
+ * for optimization purpose. It is not to be consumed by any user code.
+ */
+@Public
+public final class CompositeRoutedDataMovementEvent extends Event {
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that generated an Event.
+   * For a Processor-generated event, this is ignored.
+   */
+  private final int sourceIndex;
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that is meant to receive
+   * this Event. For a Processor event, this is ignored.
+   */
+  private int targetIndex;
+  private int count;
+
+  /**
+   * User Payload for this Event
+   */
+  private final ByteBuffer userPayload;
+
+  /**
+   * Version number to indicate what attempt generated this Event
+   */
+  private int version;
+
+
+  @Private
+  public static CompositeRoutedDataMovementEvent create(int sourceIndex,
+                                                        int targetIndex,
+                                                        int count,
+                                                        int version,
+                                                        ByteBuffer userPayload) {
+    return new CompositeRoutedDataMovementEvent(sourceIndex, targetIndex, count, version, userPayload);
+  }
+
+  @Private
+  CompositeRoutedDataMovementEvent(int sourceIndex,
+                                   int targetIndex,
+                                   int count,
+                                   int version,
+                                   ByteBuffer userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = sourceIndex;
+    this.version = version;
+    this.targetIndex = targetIndex;
+    this.count = count;
+  }
+
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
+  }
+
+  public int getSourceIndex() {
+    return sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return targetIndex;
+  }
+
+  @Private
+  public void setTargetIndex(int targetIndex) {
+    this.targetIndex = targetIndex;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  @Private
+  public void setCount(int count) {
+    this.count = count;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Private
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  @Override
+  public String toString() {
+    return "CompositeRoutedDataMovementEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+        + targetIndex + ", count=" + count + ", version=" + version + "]";
+  }
+
+  @Private
+  public DataMovementEvent expand(int offset) {
+    return DataMovementEvent.create(sourceIndex + offset, targetIndex + offset, version, userPayload);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index 490fa3c..e018864 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -27,6 +27,14 @@ message DataMovementEventProto {
   optional int32 version = 4;
 }
 
+message CompositeRoutedDataMovementEventProto {
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional int32 count = 3;
+  optional bytes user_payload = 4;
+  optional int32 version = 5;
+}
+
 message InputReadErrorEventProto {
   optional int32 index = 1;
   optional string diagnostics = 2;

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index d14527d..ca510f7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -78,10 +78,10 @@ public class BroadcastEdgeManager extends EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex)
       throws Exception {
-    return commonRouteMeta[sourceTaskIndex];
+    return CompositeEventRouteMetadata.create(1, sourceTaskIndex, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index bb4d319..9640f06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -49,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -549,36 +551,15 @@ public class Edge {
         switch (tezEvent.getEventType()) {
         case COMPOSITE_DATA_MOVEMENT_EVENT:
           {
-            CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent();        
-            EventRouteMetadata routeMeta;
-            int numEventsDone;
-            if (pendingRoutes != null) {
-              routeMeta = pendingRoutes.getRouteMeta();
-              numEventsDone = pendingRoutes.getNumEventsRouted();
-            } else {
-              routeMeta = edgeManagerOnDemand
+            CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent(); 
+            CompositeEventRouteMetadata routeMeta = edgeManagerOnDemand
                   .routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex);
-              numEventsDone = 0;
-            }
+
             if (routeMeta != null) {
-              int listSize = listToAdd.size();
-              int numEvents = routeMeta.getNumEvents();
-              int[] sourceIndices = routeMeta.getSourceIndices();
-              int[] targetIndices = routeMeta.getTargetIndices();
-              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
-                DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
-                    targetIndices[numEventsDone]);
-                numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(),
-                    tezEvent.getEventReceivedTime());
-                tezEventToSend.setDestinationInfo(destinationMetaInfo);
-                listToAdd.add(tezEventToSend);
-              }
-              if (numEventsDone < numEvents) {
-                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
-                    numEventsDone));
-                return false;
-              }
+              CompositeRoutedDataMovementEvent edme = compEvent.expandRouted(routeMeta);
+              TezEvent tezEventToSend = new TezEvent(edme, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime());
+              tezEventToSend.setDestinationInfo(destinationMetaInfo);
+              listToAdd.add(tezEventToSend);
             }
           }
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
index 84e7e66..819735a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
@@ -41,6 +41,9 @@ public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand {
   final EventRouteMetadata commonRouteMeta = 
       EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
 
+  final CompositeEventRouteMetadata compositeCommonRouteMeta =
+      CompositeEventRouteMetadata.create(1, 0, 0);
+
   public OneToOneEdgeManagerOnDemand(EdgeManagerPluginContext context) {
     super(context);
   }
@@ -84,11 +87,11 @@ public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex)
       throws Exception {
     if (sourceTaskIndex == destinationTaskIndex) {
-      return commonRouteMeta;
+      return compositeCommonRouteMeta;
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 3b66b8f..4d373ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -114,11 +114,11 @@ public class ScatterGatherEdgeManager extends EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex)
       throws Exception {
-    return EventRouteMetadata.create(1, targetIndices[sourceTaskIndex], 
-        sourceIndices[destinationTaskIndex]);
+    return CompositeEventRouteMetadata.create(1, targetIndices[sourceTaskIndex][0], 
+        sourceIndices[destinationTaskIndex][0]);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 38ef89f..3f6debf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3769,6 +3769,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         EventType lastEventType = lastEvent.getEventType();
         // if the following changes then critical path logic/recording may need revision
         if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
+            lastEventType == EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT ||
             lastEventType == EventType.DATA_MOVEMENT_EVENT ||
             lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
           task.getAttempt(attemptID).setLastEventSent(lastEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 74ac51e..6268912 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -93,6 +93,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.IOStatistics;
@@ -275,11 +276,11 @@ public class TestMockDAGAppMaster {
     tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+    Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
     Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
-    targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
-    targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
+    Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+    targetIndex1 = ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
+    targetIndex2 = ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
     // order of vA task completion can change order of events
     Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
         (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 0b96db7..966b464 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -2197,7 +2197,7 @@ public class TestDAGImpl {
     }
 
     @Override
-    public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+    public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
         int sourceTaskIndex, int destinationTaskIndex)
         throws Exception {
       if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index 9cb914f..47d133b 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -86,7 +86,7 @@ public class EdgeManagerForTest extends EdgeManagerPluginOnDemand {
   }
 
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex)
       throws Exception {
     return null;

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index f765f42..ea90158 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -50,6 +51,27 @@ public class ProtoConverters {
         proto.getUserPayload() != null ?
             proto.getUserPayload().asReadOnlyByteBuffer() : null);
   }
+  public static EventProtos.CompositeRoutedDataMovementEventProto convertCompositeRoutedDataMovementEventToProto(
+      CompositeRoutedDataMovementEvent event) {
+    EventProtos.CompositeRoutedDataMovementEventProto.Builder builder =
+        EventProtos.CompositeRoutedDataMovementEventProto.newBuilder();
+    builder.setSourceIndex(event.getSourceIndex()).
+        setTargetIndex(event.getTargetIndex()).setVersion(event.getVersion()).setCount(event.getCount());
+    if (event.getUserPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static CompositeRoutedDataMovementEvent convertCompositeRoutedDataMovementEventFromProto(
+      EventProtos.CompositeRoutedDataMovementEventProto proto) {
+    return CompositeRoutedDataMovementEvent.create(proto.getSourceIndex(),
+        proto.getTargetIndex(),
+        proto.getCount(),
+        proto.getVersion(),
+        proto.getUserPayload() != null ?
+            proto.getUserPayload().asReadOnlyByteBuffer() : null);
+  }
 
   public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto(
       CompositeDataMovementEvent event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index cb247c9..e573526 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -30,4 +30,5 @@ public enum EventType {
   ROOT_INPUT_DATA_INFORMATION_EVENT,
   COMPOSITE_DATA_MOVEMENT_EVENT,
   ROOT_INPUT_INITIALIZER_EVENT,
+  COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index e76bdbb..1a90ada 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -31,9 +31,11 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
 import org.apache.tez.runtime.api.events.EventProtos.CompositeEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.CompositeRoutedDataMovementEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
@@ -82,6 +84,8 @@ public class TezEvent implements Writable {
       eventType = EventType.DATA_MOVEMENT_EVENT;
     } else if (event instanceof CompositeDataMovementEvent) {
       eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT;
+    } else if (event instanceof CompositeRoutedDataMovementEvent) {
+      eventType = EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT;
     } else if (event instanceof VertexManagerEvent) {
       eventType = EventType.VERTEX_MANAGER_EVENT;
     } else if (event instanceof InputReadErrorEvent) {
@@ -158,6 +162,11 @@ public class TezEvent implements Writable {
             ProtoConverters.convertDataMovementEventToProto(
                 (DataMovementEvent) event);
         break;
+      case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
+          message =
+            ProtoConverters.convertCompositeRoutedDataMovementEventToProto(
+                (CompositeRoutedDataMovementEvent) event);
+      break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
         message =
             ProtoConverters.convertCompositeDataMovementEventToProto(
@@ -256,6 +265,11 @@ public class TezEvent implements Writable {
             DataMovementEventProto.parseFrom(input);
         event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
         break;
+      case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
+        CompositeRoutedDataMovementEventProto edmProto =
+            CompositeRoutedDataMovementEventProto.parseFrom(eventBytes);
+      event = ProtoConverters.convertCompositeRoutedDataMovementEventFromProto(edmProto);
+      break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
         CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
         event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java
index ff1c032..2336e15 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java
@@ -121,17 +121,16 @@ public class FairShuffleEdgeManager extends EdgeManagerPluginOnDemand {
   }
 
   @Override
-  public @Nullable EventRouteMetadata
+  public @Nullable CompositeEventRouteMetadata
       routeCompositeDataMovementEventToDestination(int sourceTaskIndex,
       int destinationTaskIndex) {
     DestinationTaskInputsProperty property = mapping.get(destinationTaskIndex);
     int firstPhysicalInputIndex =
         property.getFirstPhysicalInputIndex(sourceTaskIndex);
     if (firstPhysicalInputIndex >= 0) {
-      return EventRouteMetadata.create(property.getNumOfPartitions(),
-          getRange(firstPhysicalInputIndex, property.getNumOfPartitions()),
-          getRange(property.getFirstPartitionId(),
-          property.getNumOfPartitions()));
+      return CompositeEventRouteMetadata.create(property.getNumOfPartitions(),
+          firstPhysicalInputIndex,
+          property.getFirstPartitionId());
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 55a6ced..ed27f04 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -299,7 +299,7 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase {
     }
     
     @Override
-    public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+    public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
         int sourceTaskIndex, int destinationTaskIndex)
         throws Exception {
       int[] targetIndicesToSend;
@@ -316,8 +316,8 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase {
         partitionRange = basePartitionRange;
       }
 
-      return EventRouteMetadata.create(partitionRange, targetIndicesToSend, 
-          sourceIndices[destinationTaskIndex]);
+      return CompositeEventRouteMetadata.create(partitionRange, targetIndicesToSend[0], 
+          sourceIndices[destinationTaskIndex][0]);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
index 96cce94..1dbe6bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
-import org.apache.tez.dag.api.TezException;
 
 import javax.annotation.Nullable;
 
@@ -75,8 +74,8 @@ public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
 
   @Nullable
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
-                                                                         int destTaskId)
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
     throws Exception {
     return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
index 644d5af..068da81 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -21,6 +21,7 @@ import com.google.common.primitives.Ints;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 import org.apache.tez.dag.api.UserPayload;
 
 import javax.annotation.Nullable;
@@ -69,12 +70,12 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager
 
   @Nullable
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
                                                                          int destTaskId)
     throws Exception {
     int partition = CartesianProductCombination.fromTaskId(numPartitions,
       getIdealTaskId(destTaskId)).getCombination().get(positionId);
-    return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition});
+    return CompositeEventRouteMetadata.create(1, srcTaskId, partition);
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
index 705db05..3e1407c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -18,7 +18,9 @@
 package org.apache.tez.runtime.library.cartesianproduct;
 
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 
 /**
  * base class of cartesian product edge manager implementation
@@ -46,8 +48,8 @@ abstract class CartesianProductEdgeManagerReal {
                                                                          int destTaskId)
     throws Exception;
 
-  public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
-                                                                                  int destTaskId)
+  public abstract CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                                           int destTaskId)
     throws Exception;
 
   public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
index cea4142..9e46e95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -63,12 +63,12 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
 
   @Nullable
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+  public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
                                                                          int destTaskId)
     throws Exception {
     int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
         .getCombination().get(positionId);
-    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null;
+    return index == srcTaskId ? CompositeEventRouteMetadata.create(1, 0, 0) : null;
   }
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 7d9eacf..c1893fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -27,6 +27,7 @@ import java.util.zip.Inflater;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -93,7 +94,45 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
       numDmeEvents.incrementAndGet();
-      processDataMovementEvent((DataMovementEvent)event);
+      DataMovementEvent dmEvent = (DataMovementEvent)event;
+      DataMovementEventPayloadProto shufflePayload;
+      try {
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+      }
+      BitSet emptyPartitionsBitSet = null;
+      if (shufflePayload.hasEmptyPartitions()) {
+        try {
+          byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
+          emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+        } catch (IOException e) {
+          throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
+        }
+      }
+      processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
+      shuffleManager.updateEventReceivedTime();
+    } else if (event instanceof CompositeRoutedDataMovementEvent) {
+      CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event;
+      DataMovementEventPayloadProto shufflePayload;
+      try {
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+      }
+      BitSet emptyPartitionsBitSet = null;
+      if (shufflePayload.hasEmptyPartitions()) {
+        try {
+          byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
+          emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+        } catch (IOException e) {
+          throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
+        }
+      }
+      for (int offset = 0; offset < edme.getCount(); offset++) {
+        numDmeEvents.incrementAndGet();
+        processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+      }
       shuffleManager.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
       numObsoletionEvents.incrementAndGet();
@@ -117,14 +156,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
         + (updateOnClose == true ? ", updateOnClose" : ""));
   }
 
-  private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(
-          ByteString.copyFrom(dme.getUserPayload()));
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    }
+  private void processDataMovementEvent(DataMovementEvent dme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
     int srcIndex = dme.getSourceIndex();
     if (LOG.isDebugEnabled()) {
       LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
@@ -133,10 +165,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     }
 
     if (shufflePayload.hasEmptyPartitions()) {
-      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
-          .getEmptyPartitions(), inflater);
-      BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
-      if (emptyPartionsBitSet.get(srcIndex)) {
+      if (emptyPartitionsBitSet.get(srcIndex)) {
         InputAttemptIdentifier srcAttemptIdentifier =
             constructInputAttemptIdentifier(dme, shufflePayload, false);
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index f6f6da1..f39affe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.Inflater;
 
 import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,7 +82,45 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
       numDmeEvents.incrementAndGet();
-      processDataMovementEvent((DataMovementEvent) event);
+      DataMovementEvent dmEvent = (DataMovementEvent)event;
+      DataMovementEventPayloadProto shufflePayload;
+      try {
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+      }
+      BitSet emptyPartitionsBitSet = null;
+      if (shufflePayload.hasEmptyPartitions()) {
+        try {
+          byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
+          emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+        } catch (IOException e) {
+          throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
+        }
+      }
+      processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
+      scheduler.updateEventReceivedTime();
+    } else if (event instanceof CompositeRoutedDataMovementEvent) {
+      CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event;
+      DataMovementEventPayloadProto shufflePayload;
+      try {
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+      }
+      BitSet emptyPartitionsBitSet = null;
+      if (shufflePayload.hasEmptyPartitions()) {
+        try {
+          byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
+          emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+        } catch (IOException e) {
+          throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
+        }
+      }
+      for (int offset = 0; offset < edme.getCount(); offset++) {
+        numDmeEvents.incrementAndGet();
+        processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+      }
       scheduler.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
       numObsoletionEvents.incrementAndGet();
@@ -94,13 +133,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
     }
   }
 
-  private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    } 
+  private void processDataMovementEvent(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
     int partitionId = dmEvent.getSourceIndex();
     InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
 
@@ -112,8 +145,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
 
     if (shufflePayload.hasEmptyPartitions()) {
       try {
-        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater);
-        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
@@ -125,8 +156,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
           return;
         }
       } catch (IOException e) {
-        throw new TezUncheckedException("Unable to set " +
-                "the empty partition to succeeded", e);
+        throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
index 9c94c14..61ca785 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java
@@ -122,22 +122,22 @@ public class TestFairShuffleVertexManager
     // The first destination task fetches two partitions from all source tasks.
     // 6 == 3 source tasks * 2 merged partitions
     Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0));
-    EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata;
     for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
-        routeMetadata = (j == 0) ?
-            edgeManager.routeCompositeDataMovementEventToDestination(
-                sourceTaskIndex, 0) :
-            edgeManager.routeInputSourceTaskFailedEventToDestination(
-                sourceTaskIndex, 0);
-        Assert.assertEquals(2, routeMetadata.getNumEvents());
         if (j == 0) {
-          Assert.assertArrayEquals(new int[]{0, 1},
-              routeMetadata.getSourceIndices());
+          EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
+              edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0);
+          Assert.assertEquals(2, routeMetadata.getCount());
+          Assert.assertEquals(0, routeMetadata.getSource());
+          Assert.assertEquals(sourceTaskIndex*2, routeMetadata.getTarget());
+        } else {
+          EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata =
+              edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0);
+          Assert.assertEquals(2, routeMetadata.getNumEvents());
+          Assert.assertArrayEquals(
+              new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2},
+              routeMetadata.getTargetIndices());
         }
-        Assert.assertArrayEquals(
-            new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2},
-            routeMetadata.getTargetIndices());
       }
     }
   }
@@ -156,22 +156,22 @@ public class TestFairShuffleVertexManager
     // The first destination task fetches two partitions from all source tasks.
     // 6 == 3 source tasks * 2 merged partitions
     Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0));
-    EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata;
     for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
-        routeMetadata = (j == 0) ?
-            edgeManager.routeCompositeDataMovementEventToDestination(
-                sourceTaskIndex, 0) :
-            edgeManager.routeInputSourceTaskFailedEventToDestination(
-                sourceTaskIndex, 0);
-        Assert.assertEquals(2, routeMetadata.getNumEvents());
         if (j == 0) {
-          Assert.assertArrayEquals(new int[]{0, 1},
-              routeMetadata.getSourceIndices());
+          EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
+              edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0);
+          Assert.assertEquals(2, routeMetadata.getCount());
+          Assert.assertEquals(0, routeMetadata.getSource());
+          Assert.assertEquals(sourceTaskIndex*2, routeMetadata.getTarget());
+        } else {
+          EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata =
+              edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0);
+          Assert.assertEquals(2, routeMetadata.getNumEvents());
+          Assert.assertArrayEquals(
+              new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2},
+              routeMetadata.getTargetIndices());
         }
-        Assert.assertArrayEquals(
-            new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2},
-            routeMetadata.getTargetIndices());
       }
     }
 
@@ -179,16 +179,18 @@ public class TestFairShuffleVertexManager
     // task.
     Assert.assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
     for (int j = 0; j < 2; j++) {
-      routeMetadata = (j == 0) ?
-          edgeManager.routeCompositeDataMovementEventToDestination(
-              0, 1) :
-          edgeManager.routeInputSourceTaskFailedEventToDestination(
-              0, 1);
-      Assert.assertEquals(1, routeMetadata.getNumEvents());
       if (j == 0) {
-        Assert.assertEquals(2, routeMetadata.getSourceIndices()[0]);
+        EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
+            edgeManager.routeCompositeDataMovementEventToDestination(0, 1);
+        Assert.assertEquals(1, routeMetadata.getCount());
+        Assert.assertEquals(2, routeMetadata.getSource());
+        Assert.assertEquals(0, routeMetadata.getTarget());
+      } else {
+        EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata =
+            edgeManager.routeInputSourceTaskFailedEventToDestination(0, 1);
+        Assert.assertEquals(1, routeMetadata.getNumEvents());
+        Assert.assertEquals(0, routeMetadata.getTargetIndices()[0]);
       }
-      Assert.assertEquals(0, routeMetadata.getTargetIndices()[0]);
     }
 
     // The 3rd destination task fetches one partition from the 2nd and 3rd
@@ -196,17 +198,18 @@ public class TestFairShuffleVertexManager
     Assert.assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(2));
     for (int sourceTaskIndex = 1; sourceTaskIndex < 3; sourceTaskIndex++) {
       for (int j = 0; j < 2; j++) {
-        routeMetadata = (j == 0) ?
-            edgeManager.routeCompositeDataMovementEventToDestination(
-                sourceTaskIndex, 2) :
-            edgeManager.routeInputSourceTaskFailedEventToDestination(
-                sourceTaskIndex, 2);
-        Assert.assertEquals(1, routeMetadata.getNumEvents());
         if (j == 0) {
-          Assert.assertEquals(2, routeMetadata.getSourceIndices()[0]);
+          EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata =
+              edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 2);
+          Assert.assertEquals(1, routeMetadata.getCount());
+          Assert.assertEquals(2, routeMetadata.getSource());
+          Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTarget());
+        } else {
+          EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata =
+              edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 2);
+          Assert.assertEquals(1, routeMetadata.getNumEvents());
+          Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTargetIndices()[0]);
         }
-        Assert.assertEquals(sourceTaskIndex - 1,
-            routeMetadata.getTargetIndices()[0]);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
index 8710c55..09f3b52 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 import org.apache.tez.dag.api.UserPayload;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,13 +62,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeDataMovementEventToDestination(1,0,1);
+    EventRouteMetadata routingData = edgeManager.routeDataMovementEventToDestination(1,0,1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -92,13 +93,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(1, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -156,13 +157,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{2}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(2, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -179,13 +180,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -218,13 +219,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -241,13 +242,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
@@ -264,13 +265,13 @@ public class TestCartesianProductEdgeManagerPartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
-    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(1, compositeRoutingData.getSource());
+    assertEquals(1, compositeRoutingData.getTarget());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNotNull(routingData);
     assertEquals(1, routingData.getNumEvents());
     assertArrayEquals(new int[]{1}, routingData.getTargetIndices());

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
index 4c69482..ec97335 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -56,23 +57,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(routingData);
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(compositeRoutingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNull(routingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
     assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
 
@@ -86,23 +87,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
-    assertNull(routingData);
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
+    assertNull(compositeRoutingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
     assertNull(routingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
     assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
 
@@ -130,23 +131,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(routingData);
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(compositeRoutingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNull(routingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
     assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
 
@@ -160,23 +161,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(routingData);
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(compositeRoutingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
     assertNull(routingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
     assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
 
@@ -190,23 +191,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
     edgeManager.initialize(config);
 
-    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
-    assertNull(routingData);
+    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
+    assertNull(compositeRoutingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
+    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
     assertNull(routingData);
 
-    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+    assertNotNull(compositeRoutingData);
+    assertEquals(1, compositeRoutingData.getCount());
+    assertEquals(0, compositeRoutingData.getTarget());
+    assertEquals(0, compositeRoutingData.getSource());
 
     assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 7d88fdf..438a5aa 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -845,7 +845,7 @@ public class TestExceptionPropagation {
     }
 
     @Override
-    public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+    public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
         int sourceTaskIndex, int destinationTaskIndex)
         throws Exception {
       if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {


Mime
View raw message