tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [1/2] tez git commit: TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler (jeagles)
Date Tue, 06 Dec 2016 17:31:19 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 25643aab1 -> fe6746d78


http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index b6599dc..18b824a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -29,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
@@ -79,7 +81,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
  private final int dagId;
   private final MapHost mapHost;
 
-  private final int currentPartition;
+  private final int minPartition;
+  private final int maxPartition;
 
   // Decompression of map-outputs
   private final CompressionCodec codec;
@@ -99,6 +102,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
   volatile BaseHttpConnection httpConnection;
   private final boolean asyncHttp;
+  private final boolean compositeFetch;
 
 
   // Initiative value is 0, which means it hasn't retried yet.
@@ -127,13 +131,15 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                int dagId,
                                boolean asyncHttp,
                                boolean sslShuffle,
-                               boolean verifyDiskChecksum) {
+                               boolean verifyDiskChecksum,
+                               boolean compositeFetch) {
     this.scheduler = scheduler;
     this.allocator = allocator;
     this.metrics = metrics;
     this.exceptionReporter = exceptionReporter;
     this.mapHost = mapHost;
-    this.currentPartition = this.mapHost.getPartitionId();
+    this.minPartition = this.mapHost.getPartitionId();
+    this.maxPartition = this.minPartition + this.mapHost.getPartitionCount() - 1;
     this.id = nextId.incrementAndGet();
     this.jobTokenSecretManager = jobTokenSecretMgr;
 
@@ -162,6 +168,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     this.localDiskFetchEnabled = localDiskFetchEnabled;
     this.sslShuffle = sslShuffle;
     this.verifyDiskChecksum = verifyDiskChecksum;
+    this.compositeFetch = compositeFetch;
 
     this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
   }
@@ -252,7 +259,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     }
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + srcAttempts + ", partitionId: " + currentPartition);
+        + srcAttempts + ", partition range: " + minPartition + "-" + maxPartition);
     }
     populateRemainingMap(srcAttempts);
     // Construct the url and connect
@@ -333,7 +340,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     boolean connectSucceeded = false;
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(),
-          host.getPort(), host.getPartitionId(), applicationId, dagId, sslShuffle);
+          host.getPort(), host.getPartitionId(), host.getPartitionCount(), applicationId, dagId, sslShuffle);
       URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive());
       httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
           logIdentifier, jobTokenSecretManager);
@@ -399,134 +406,172 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
   private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
 
+  private static class MapOutputStat {
+    final InputAttemptIdentifier srcAttemptId;
+    final long decompressedLength;
+    final long compressedLength;
+    final int forReduce;
+
+    MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) {
+      this.srcAttemptId = srcAttemptId;
+      this.decompressedLength = decompressedLength;
+      this.compressedLength = compressedLength;
+      this.forReduce = forReduce;
+    }
+
+    @Override
+    public String toString() {
+      return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce);
+    }
+  }
+
   protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
                                 DataInputStream input) throws FetcherReadTimeoutException {
     MapOutput mapOutput = null;
     InputAttemptIdentifier srcAttemptId = null;
-    long decompressedLength = -1;
-    long compressedLength = -1;
-
+    long decompressedLength = 0;
+    long compressedLength = 0;
     try {
       long startTime = System.currentTimeMillis();
-      int forReduce = -1;
-      //Read the shuffle header
-      try {
-        ShuffleHeader header = new ShuffleHeader();
-        // TODO Review: Multiple header reads in case of status WAIT ? 
-        header.readFields(input);
-        if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
+      int partitionCount = 1;
+
+      if (this.compositeFetch) {
+        // Multiple partitions are fetched
+        partitionCount = WritableUtils.readVInt(input);
+      }
+      ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount);
+      for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) {
+        MapOutputStat mapOutputStat = null;
+        try {
+          //Read the shuffle header
+          ShuffleHeader header = new ShuffleHeader();
+          // TODO Review: Multiple header reads in case of status WAIT ?
+          header.readFields(input);
+          if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
+            if (!stopped) {
+              badIdErrs.increment(1);
+              LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " +
+                  InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
+              return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Already shutdown. Ignoring invalid map id error");
+              }
+              return EMPTY_ATTEMPT_ID_ARRAY;
+            }
+          }
+
+          if (header.getCompressedLength() == 0) {
+            // Empty partitions are already accounted for
+            continue;
+          }
+
+          mapOutputStat = new MapOutputStat(scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce),
+              header.uncompressedLength,
+              header.compressedLength,
+              header.forReduce);
+          mapOutputStats.add(mapOutputStat);
+        } catch (IllegalArgumentException e) {
           if (!stopped) {
             badIdErrs.increment(1);
-            LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " +
-                InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
-            return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+            LOG.warn("Invalid map id ", e);
+            // Don't know which one was bad, so consider this one bad and dont read
+            // the remaining because we dont know where to start reading from. YARN-1773
+            return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
           } else {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Already shutdown. Ignoring invalid map id error");
+              LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
+                  e.getClass().getName() + ", Message: " + e.getMessage());
             }
             return EMPTY_ATTEMPT_ID_ARRAY;
           }
         }
-        srcAttemptId = 
-            scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
-        compressedLength = header.compressedLength;
-        decompressedLength = header.uncompressedLength;
-        forReduce = header.forReduce;
-      } catch (IllegalArgumentException e) {
-        if (!stopped) {
-          badIdErrs.increment(1);
-          LOG.warn("Invalid map id ", e);
-          // Don't know which one was bad, so consider this one bad and dont read
-          // the remaining because we dont know where to start reading from. YARN-1773
-          return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
-                e.getClass().getName() + ", Message: " + e.getMessage());
+
+        // Do some basic sanity verification
+        if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, mapOutputStat.forReduce,
+            remaining, mapOutputStat.srcAttemptId)) {
+          if (!stopped) {
+            srcAttemptId = mapOutputStat.srcAttemptId;
+            if (srcAttemptId == null) {
+              srcAttemptId = getNextRemainingAttempt();
+              LOG.warn("Was expecting " + srcAttemptId + " but got null");
+            }
+            assert (srcAttemptId != null);
+            return new InputAttemptIdentifier[]{srcAttemptId};
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Already stopped. Ignoring verification failure.");
+            }
+            return EMPTY_ATTEMPT_ID_ARRAY;
           }
-          return EMPTY_ATTEMPT_ID_ARRAY;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength +
+              ", decomp len: " + mapOutputStat.decompressedLength);
         }
       }
 
-      // Do some basic sanity verification
-      if (!verifySanity(compressedLength, decompressedLength, forReduce,
-          remaining, srcAttemptId)) {
-        if (!stopped) {
-          if (srcAttemptId == null) {
-            LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
-            srcAttemptId = getNextRemainingAttempt();
-          }
-          assert (srcAttemptId != null);
-          return new InputAttemptIdentifier[]{srcAttemptId};
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Already stopped. Ignoring verification failure.");
+      for (MapOutputStat mapOutputStat : mapOutputStats) {
+        // Get the location for the map output - either in-memory or on-disk
+        srcAttemptId = mapOutputStat.srcAttemptId;
+        decompressedLength = mapOutputStat.decompressedLength;
+        compressedLength = mapOutputStat.compressedLength;
+        try {
+          mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id);
+        } catch (IOException e) {
+          if (!stopped) {
+            // Kill the reduce attempt
+            ioErrs.increment(1);
+            scheduler.reportLocalError(e);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Already stopped. Ignoring error from merger.reserve");
+            }
           }
           return EMPTY_ATTEMPT_ID_ARRAY;
         }
-      }
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
-            ", decomp len: " + decompressedLength);
-      }
 
-      // Get the location for the map output - either in-memory or on-disk
-      try {
-        mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id);
-      } catch (IOException e) {
-        if (!stopped) {
-          // Kill the reduce attempt
-          ioErrs.increment(1);
-          scheduler.reportLocalError(e);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Already stopped. Ignoring error from merger.reserve");
-          }
+        // Check if we can shuffle *now* ...
+        if (mapOutput.getType() == Type.WAIT) {
+          LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+          //Not an error but wait to process data.
+          return EMPTY_ATTEMPT_ID_ARRAY;
         }
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      }
-      
-      // Check if we can shuffle *now* ...
-      if (mapOutput.getType() == Type.WAIT) {
-        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        //Not an error but wait to process data.
-        return EMPTY_ATTEMPT_ID_ARRAY;
-      } 
-      
-      // Go!
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("fetcher#" + id + " about to shuffle output of map " +
-            mapOutput.getAttemptIdentifier() + " decomp: " +
-            decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
-      }
 
-      if (mapOutput.getType() == Type.MEMORY) {
-        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
-          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
-          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
-      } else if (mapOutput.getType() == Type.DISK) {
-        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
-          input, compressedLength, decompressedLength, LOG,
-          mapOutput.getAttemptIdentifier().toString(),
-          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
-      } else {
-        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
-            mapOutput.getType());
-      }
+        // Go!
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("fetcher#" + id + " about to shuffle output of map " +
+              mapOutput.getAttemptIdentifier() + " decomp: " +
+              decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
+        }
 
-      // Inform the shuffle scheduler
-      long endTime = System.currentTimeMillis();
-      // Reset retryStartTime as map task make progress if retried before.
-      retryStartTime = 0;
+        if (mapOutput.getType() == Type.MEMORY) {
+          ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
+              (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
+              ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
+        } else if (mapOutput.getType() == Type.DISK) {
+          ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
+              input, compressedLength, decompressedLength, LOG,
+              mapOutput.getAttemptIdentifier().toString(),
+              ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
+        } else {
+          throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
+              mapOutput.getType());
+        }
 
-      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
-                              endTime - startTime, mapOutput, false);
-      // Note successful shuffle
-      remaining.remove(srcAttemptId.toString());
-      metrics.successFetch();
-      return null;
-    } catch (IOException ioe) {
+        // Inform the shuffle scheduler
+        long endTime = System.currentTimeMillis();
+        // Reset retryStartTime as map task make progress if retried before.
+        retryStartTime = 0;
+
+        scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
+            endTime - startTime, mapOutput, false);
+        // Note successful shuffle
+        remaining.remove(srcAttemptId.toString());
+        metrics.successFetch();
+      }
+    } catch(IOException ioe) {
       if (stopped) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Not reporting fetch failure for exception during data copy: ["
@@ -548,23 +593,24 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       }
       ioErrs.increment(1);
       if (srcAttemptId == null || mapOutput == null) {
-        LOG.info("fetcher#" + id + " failed to read map header" + 
-                 srcAttemptId + " decomp: " + 
-                 decompressedLength + ", " + compressedLength, ioe);
-        if(srcAttemptId == null) {
+        LOG.info("fetcher#" + id + " failed to read map header" +
+            srcAttemptId + " decomp: " +
+            decompressedLength + ", " + compressedLength, ioe);
+        if (srcAttemptId == null) {
           return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]);
         } else {
-          return new InputAttemptIdentifier[] {srcAttemptId};
+          return new InputAttemptIdentifier[]{srcAttemptId};
         }
       }
-      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
-               " from " + host.getHostIdentifier(), ioe); 
+      LOG.warn("Failed to shuffle output of " + srcAttemptId +
+          " from " + host.getHostIdentifier(), ioe);
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
       metrics.failedFetch();
-      return new InputAttemptIdentifier[] {srcAttemptId};
+      return new InputAttemptIdentifier[]{srcAttemptId};
     }
+    return null;
   }
 
   /**
@@ -619,21 +665,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
     // partitionId verification. Isn't availalbe here because it is encoded into
     // URI
-    if (forReduce != currentPartition) {
+    if (forReduce < minPartition || forReduce > maxPartition) {
       wrongReduceErrs.increment(1);
       LOG.warn(logIdentifier + " data for the wrong partition map: " + srcAttemptId + " len: "
           + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
-          + ", expected partition: " + currentPartition);
-      return false;
-    }
-
-    // Sanity check
-    if (remaining.get(srcAttemptId.toString()) == null) {
-      wrongMapErrs.increment(1);
-      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
+          + ", expected partition range: " + minPartition + "-" + maxPartition);
       return false;
     }
-    
     return true;
   }
   
@@ -658,7 +696,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch (local disk) from " + host + " for: "
-          + srcAttempts + ", partitionId: " + currentPartition);
+          + srcAttempts + ", partition range: " + minPartition + "-" + maxPartition);
     }
 
     // List of maps to be fetched yet
@@ -678,7 +716,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
           Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
 
           TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
-              currentPartition);
+              minPartition);
 
           mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
           long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
index c2cfd06..4746306 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -86,19 +86,25 @@ class MapHost {
   private final String host;
   private final int port;
   private final int partition;
+  private final int partitionCount;
   // Tracks attempt IDs
   private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
   
-  public MapHost(String host, int port, int partition) {
+  public MapHost(String host, int port, int partition, int partitionCount) {
     this.host = host;
     this.port = port;
     this.partition = partition;
+    this.partitionCount = partitionCount;
   }
 
   public int getPartitionId() {
     return partition;
   }
 
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+
   public State getState() {
     return state;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index e5f4e5c..b3d8a6f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -187,7 +187,8 @@ public class Shuffle implements ExceptionReporter {
 
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
-        scheduler);
+        scheduler,
+        conf);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index f39affe..d34fb5f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -26,10 +26,12 @@ import java.util.zip.Inflater;
 
 import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -49,6 +51,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
 
   private final ShuffleScheduler scheduler;
   private final InputContext inputContext;
+  private final boolean compositeFetch;
   private final Inflater inflater;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
@@ -57,9 +60,11 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
 
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
-                                                ShuffleScheduler scheduler) {
+                                                ShuffleScheduler scheduler,
+                                                Configuration conf) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
+    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
     this.inflater = TezCommonUtils.newInflater();
   }
 
@@ -101,10 +106,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
       scheduler.updateEventReceivedTime();
     } else if (event instanceof CompositeRoutedDataMovementEvent) {
-      CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event;
+      CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event;
       DataMovementEventPayloadProto shufflePayload;
       try {
-        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload()));
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(crdme.getUserPayload()));
       } catch (InvalidProtocolBufferException e) {
         throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
       }
@@ -117,9 +122,14 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
           throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
         }
       }
-      for (int offset = 0; offset < edme.getCount(); offset++) {
-        numDmeEvents.incrementAndGet();
-        processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+      if (compositeFetch) {
+        numDmeEvents.addAndGet(crdme.getCount());
+        processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet);
+      } else {
+        for (int offset = 0; offset < crdme.getCount(); offset++) {
+          numDmeEvents.incrementAndGet();
+          processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+        }
       }
       scheduler.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
@@ -135,7 +145,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
 
   private void processDataMovementEvent(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
     int partitionId = dmEvent.getSourceIndex();
-    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
+    CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent.getTargetIndex(), 1, dmEvent.getVersion(), shufflePayload);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
@@ -152,7 +162,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
                     + srcAttemptIdentifier + "]. Not fetching.");
           }
           numDmeEventsNoData.incrementAndGet();
-          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true);
+          scheduler.copySucceeded(srcAttemptIdentifier.expand(0), null, 0, 0, 0, null, true);
           return;
         }
       } catch (IOException e) {
@@ -164,6 +174,40 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
         partitionId, srcAttemptIdentifier);
   }
 
+  private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
+    int partitionId = crdmEvent.getSourceIndex();
+    CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = constructInputAttemptIdentifier(crdmEvent.getTargetIndex(), crdmEvent.getCount(), crdmEvent.getVersion(), shufflePayload);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + crdmEvent.getTargetIndex() + ", count:" + crdmEvent.getCount()
+          + ", attemptNum: " + crdmEvent.getVersion() + ", payload: " +
+          ShuffleUtils.stringify(shufflePayload));
+    }
+
+    if (shufflePayload.hasEmptyPartitions()) {
+      boolean allPartitionsEmpty = true;
+      for (int i = 0; i < crdmEvent.getCount(); i++) {
+        int srcPartitionId = partitionId + i;
+        allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId);
+        if (emptyPartitionsBitSet.get(srcPartitionId)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
+                + compositeInputAttemptIdentifier + "]. Not fetching.");
+          }
+          numDmeEventsNoData.addAndGet(crdmEvent.getCount());
+          scheduler.copySucceeded(compositeInputAttemptIdentifier.expand(i), null, 0, 0, 0, null, true);
+        }
+      }
+
+      if (allPartitionsEmpty) {
+        return;
+      }
+    }
+
+    scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(),
+        partitionId, compositeInputAttemptIdentifier);
+  }
+
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
     InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
     scheduler.obsoleteInput(taIdentifier);
@@ -175,25 +219,26 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   /**
    * Helper method to create InputAttemptIdentifier
    *
-   * @param dmEvent
+   * @param targetIndex
+   * @param targetIndexCount
+   * @param version
    * @param shufflePayload
-   * @return InputAttemptIdentifier
+   * @return CompositeInputAttemptIdentifier
    */
-  private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
-      DataMovementEventPayloadProto shufflePayload) {
+  private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version,
+                                                                          DataMovementEventPayloadProto shufflePayload) {
     String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null;
     int spillEventId = shufflePayload.getSpillId();
-    InputAttemptIdentifier srcAttemptIdentifier = null;
+    CompositeInputAttemptIdentifier srcAttemptIdentifier = null;
     if (shufflePayload.hasSpillId()) {
       boolean lastEvent = shufflePayload.getLastEvent();
       InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
           .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
-              .getVersion(), pathComponent, false, info, spillEventId);
+          new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, false, info, spillEventId, targetIndexCount);
     } else {
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), pathComponent);
+          new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, targetIndexCount);
     }
     return srcAttemptIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 2f3c137..129e0cc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -60,6 +60,7 @@ import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -237,6 +238,7 @@ class ShuffleScheduler {
   private final float maxAllowedFailedFetchFraction;
   private final boolean checkFailedFetchSinceLastCompletion;
   private final boolean verifyDiskChecksum;
+  private final boolean compositeFetch;
 
   private volatile Thread shuffleSchedulerThread = null;
 
@@ -407,6 +409,7 @@ class ShuffleScheduler {
     this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
+    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
 
     pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
     LOG.info("ShuffleScheduler running for sourceVertex: "
@@ -1024,13 +1027,13 @@ class ShuffleScheduler {
   public synchronized void addKnownMapOutput(String inputHostName,
                                              int port,
                                              int partitionId,
-                                             InputAttemptIdentifier srcAttempt) {
+                                             CompositeInputAttemptIdentifier srcAttempt) {
     uniqueHosts.add(new HostPort(inputHostName, port));
     HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId);
 
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
-      host = new MapHost(inputHostName, port, partitionId);
+      host = new MapHost(inputHostName, port, partitionId, srcAttempt.getInputIdentifierCount());
       mapLocations.put(identifier, host);
     }
 
@@ -1040,9 +1043,10 @@ class ShuffleScheduler {
     }
 
     host.addKnownMap(srcAttempt);
-    pathToIdentifierMap.put(
-        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(),
-            partitionId), srcAttempt);
+    for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
+      PathPartition pathPartition = new PathPartition(srcAttempt.getPathComponent(), partitionId + i);
+      pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
+    }
 
     // Mark the host as pending
     if (host.getState() == MapHost.State.PENDING) {
@@ -1102,7 +1106,7 @@ class ShuffleScheduler {
 
   public InputAttemptIdentifier getIdentifierForFetchedOutput(
       String path, int reduceId) {
-    return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
+    return pathToIdentifierMap.get(new PathPartition(path, reduceId));
   }
   
   private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
@@ -1243,11 +1247,7 @@ class ShuffleScheduler {
     }
     
   }
-  
-  private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) {
-    return new PathPartition(path, reduceId);
-  }
-  
+
   /**
    * A thread that takes hosts off of the penalty list when the timer expires.
    */
@@ -1394,7 +1394,7 @@ class ShuffleScheduler {
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
         connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle,
-        verifyDiskChecksum);
+        verifyDiskChecksum, compositeFetch);
   }
 
   private class FetchFutureCallback implements FutureCallback<Void> {

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 8ab65e0..142f582 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -122,6 +123,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
         codec = null;
       }
 
+      boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
       boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
           TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
       int ifileReadAheadLength = 0;
@@ -145,7 +147,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
           ifileReadAhead, ifileReadAheadLength, codec, inputManager);
 
       this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager,
-          inputManager, codec, ifileReadAhead, ifileReadAheadLength);
+          inputManager, codec, ifileReadAhead, ifileReadAheadLength, compositeFetch);
 
       ////// End of Initial configuration
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index bd0ea0f..17a065c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -71,11 +71,11 @@ public class TestFetcher {
 
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
-        PORT, false, true);
-    builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
+        PORT, false, true, false);
+    builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
-    FetchResult fr = new FetchResult(HOST, PORT, 0, Arrays.asList(srcAttempts));
+    FetchResult fr = new FetchResult(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
     Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false);
     doReturn(hfr).when(fetcher).setupLocalDiskFetch();
     doReturn(null).when(fetcher).doHttpFetch();
@@ -89,8 +89,8 @@ public class TestFetcher {
     // when enabled and hostname does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
-        PORT, false, true);
-    builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
+        PORT, false, true, false);
+    builder.assignWork(HOST + "_OTHER", PORT, 0, 1, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
     doReturn(null).when(fetcher).setupLocalDiskFetch();
@@ -105,8 +105,8 @@ public class TestFetcher {
     // when enabled and port does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
-        PORT, false, true);
-    builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
+        PORT, false, true, false);
+    builder.assignWork(HOST, PORT + 1, 0, 1, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
     doReturn(null).when(fetcher).setupLocalDiskFetch();
@@ -122,8 +122,8 @@ public class TestFetcher {
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
-        PORT, false, true);
-    builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
+        PORT, false, true, false);
+    builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
     doReturn(null).when(fetcher).setupLocalDiskFetch();
@@ -156,8 +156,8 @@ public class TestFetcher {
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
-        false, true);
-    builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
+        false, true, false);
+    builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
     doAnswer(new Answer<Path>() {
@@ -275,8 +275,8 @@ public class TestFetcher {
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
-        false, true);
-    builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
+        false, true, false);
+    builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
     fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
     Assert.assertTrue(expectedSrcAttempts.length == fetcher.srcAttemptsRemaining.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index e085d1a..e6accda 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -51,6 +51,7 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -73,7 +74,7 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, null);
@@ -82,8 +83,8 @@ public class TestShuffleInputEventHandlerImpl {
     eventList.add(dme);
     handler.handleEvents(eventList);
 
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0,
-        PATH_COMPONENT);
+    CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0,
+        PATH_COMPONENT, 1);
 
     verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0));
   }
@@ -95,7 +96,7 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(0));
@@ -116,7 +117,7 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     int taskIndex = 1;
     Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(1));
@@ -124,7 +125,7 @@ public class TestShuffleInputEventHandlerImpl {
     eventList.add(dme);
     handler.handleEvents(eventList);
 
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
+    CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1);
 
     verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0));
   }
@@ -136,7 +137,7 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     int taskIndex1 = 1;
     Event dme1 = createDataMovementEvent(0, taskIndex1, createEmptyPartitionByteString(0));
@@ -149,7 +150,7 @@ public class TestShuffleInputEventHandlerImpl {
     handler.handleEvents(eventList);
 
     InputAttemptIdentifier expectedIdentifier1 = new InputAttemptIdentifier(taskIndex1, 0);
-    InputAttemptIdentifier expectedIdentifier2 = new InputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT);
+    CompositeInputAttemptIdentifier expectedIdentifier2 = new CompositeInputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT, 1);
 
     verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier1));
     verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier2), eq(0));
@@ -209,22 +210,22 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     //0--> 1 with spill id 0 (attemptNum 0)
     Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0,
-        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    CompositeInputAttemptIdentifier expectedId1 = new CompositeInputAttemptIdentifier(1, 0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
     verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0));
 
     //0--> 1 with spill id 1 (attemptNum 0)
     dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0,
-        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
+    CompositeInputAttemptIdentifier expectedId2 = new CompositeInputAttemptIdentifier(1, 0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
     verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0));
 
     //0--> 1 with spill id 1 (attemptNum 1).  This should report exception
@@ -246,14 +247,14 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     //0--> 1 with spill id 0 (attemptNum 1).  attemptNum 0 is not sent.
     Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1,
-        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
+    CompositeInputAttemptIdentifier expected = new CompositeInputAttemptIdentifier(1, 1,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
     verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
 
     //Now send attemptNum 0.  This should throw exception, because attempt #1 is already added
@@ -275,7 +276,7 @@ public class TestShuffleInputEventHandlerImpl {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
-        shuffleManager, inputAllocator, null, false, 0);
+        shuffleManager, inputAllocator, null, false, 0, false);
 
     //0--> 1 with spill id 0 (attemptNum 0) with empty partitions
     BitSet bitSet = new BitSet(4);

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index 34ca13f..f026cb2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -88,7 +88,7 @@ public class TestShuffleManager {
     FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
 
     ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(
-        inputContext, shuffleManager, inputAllocator, null, false, 0);
+        inputContext, shuffleManager, inputAllocator, null, false, 0, false);
     shuffleManager.run();
 
     List<Event> eventList = new LinkedList<Event>();

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 310f1b2..a6e4c21 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -117,7 +117,7 @@ public class TestFetcher {
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("src vertex").when(inputContext).getSourceVertexName();
 
-    MapHost mapHost = new MapHost(HOST, PORT, 0);
+    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
     InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
     mapHost.addKnownMap(inputAttemptIdentifier);
     List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
@@ -128,7 +128,7 @@ public class TestFetcher {
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true);
+            false, false, true, false);
 
     fetcher.call();
     verify(scheduler).getMapsForHost(mapHost);
@@ -151,13 +151,13 @@ public class TestFetcher {
 
     final boolean ENABLE_LOCAL_FETCH = true;
     final boolean DISABLE_LOCAL_FETCH = false;
-    MapHost mapHost = new MapHost(HOST, PORT, 0);
+    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
     FetcherOrderedGrouped fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true);
+            false, false, true, false);
 
     // when local mode is enabled and host and port matches use local fetch
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -169,13 +169,13 @@ public class TestFetcher {
     verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
 
     // if hostname does not match use http
-    mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
+    mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1);
     fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false ,true);
+            false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -185,13 +185,13 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     // if port does not match use http
-    mapHost = new MapHost(HOST, PORT + 1, 0);
+    mapHost = new MapHost(HOST, PORT + 1, 0, 1);
     fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true);
+            false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -201,12 +201,12 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     //if local fetch is not enabled
-    mapHost = new MapHost(HOST, PORT, 0);
+    mapHost = new MapHost(HOST, PORT, 0, 1);
     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
         wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-        false, false, true);
+        false, false, true, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -227,11 +227,11 @@ public class TestFetcher {
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
 
-    MapHost host = new MapHost(HOST, PORT, 1);
+    MapHost host = new MapHost(HOST, PORT, 1, 1);
     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-        false, false, true);
+        false, false, true, false);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
@@ -372,11 +372,11 @@ public class TestFetcher {
     when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
 
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-    final MapHost host = new MapHost(HOST, PORT, 1);
+    final MapHost host = new MapHost(HOST, PORT, 1, 1);
     FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-        false, false, true);
+        false, false, true, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
 
@@ -461,14 +461,14 @@ public class TestFetcher {
     doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));
 
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-    final MapHost host = new MapHost(HOST, PORT, 1);
+    final MapHost host = new MapHost(HOST, PORT, 1, 1);
     FetcherOrderedGrouped mockFetcher =
         new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
             false, 0,
             null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            true, false, true);
+            true, false, true, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
     fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -530,13 +530,13 @@ public class TestFetcher {
     MergeManager merger = mock(MergeManager.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
-    MapHost mapHost = new MapHost(HOST, PORT, 0);
+    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
     FetcherOrderedGrouped fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
-            false, false, true);
+            false, false, true, false);
     fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
     Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
     Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 26aa298..1d4afde 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -16,6 +16,7 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Before;
@@ -153,7 +154,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         0,
         "src vertex");
     scheduler = spy(realScheduler);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, config);
     mergeManager = mock(MergeManager.class);
   }
 
@@ -163,9 +164,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     int attemptNum = 0;
     int inputIdx = 0;
     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0);
-    InputAttemptIdentifier id1 =
-        new InputAttemptIdentifier(inputIdx, attemptNum,
-            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    CompositeInputAttemptIdentifier id1 =
+        new CompositeInputAttemptIdentifier(inputIdx, attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
     handler.handleEvents(Collections.singletonList(dme1));
     int partitionId = attemptNum;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1));
@@ -173,9 +174,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
 
     //Send final_update event.
     Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
-    InputAttemptIdentifier id2 =
-        new InputAttemptIdentifier(inputIdx, attemptNum,
-            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
+    CompositeInputAttemptIdentifier id2 =
+        new CompositeInputAttemptIdentifier(inputIdx, attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1, 1);
     handler.handleEvents(Collections.singletonList(dme2));
     partitionId = attemptNum;
     assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
@@ -224,9 +225,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
     handler.handleEvents(Collections.singletonList(dme1));
 
-    InputAttemptIdentifier id1 =
-        new InputAttemptIdentifier(inputIdx, attemptNum,
-            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    CompositeInputAttemptIdentifier id1 =
+        new CompositeInputAttemptIdentifier(inputIdx, attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
 
     verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
@@ -252,8 +253,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false);
     events.add(dme);
     handler.handleEvents(events);
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
-        PATH_COMPONENT);
+    CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(targetIdx, 0,
+        PATH_COMPONENT, 1);
     int partitionId = srcIdx;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
         eq(expectedIdentifier));
@@ -310,8 +311,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     events.add(dme);
     handler.handleEvents(events);
     int partitionId = srcIdx;
-    InputAttemptIdentifier expectedIdentifier =
-        new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
+    CompositeInputAttemptIdentifier expectedIdentifier =
+        new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1);
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier));
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 31da4d0..52db21b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -87,8 +88,8 @@ public class TestShuffleScheduler {
 
       // Schedule all copies.
       for (int i = 0; i < numInputs; i++) {
-        InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(i, 0, "attempt_");
+        CompositeInputAttemptIdentifier inputAttemptIdentifier =
+            new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
         scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
@@ -132,8 +133,8 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
 
       for (int i = 0; i < numInputs; i++) {
-        InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(i, 0, "attempt_");
+        CompositeInputAttemptIdentifier inputAttemptIdentifier =
+            new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
         scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
@@ -189,8 +190,8 @@ public class TestShuffleScheduler {
 
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -203,7 +204,7 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //99 fails
@@ -211,7 +212,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
 
@@ -221,7 +222,7 @@ public class TestShuffleScheduler {
     //Should fail here and report exception as reducer is not healthy
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 %
         totalProducerNodes),
-        10000, 200), false, true, false);
+        10000, 200, 1), false, true, false);
 
     int minFailurePerHost = conf.getInt(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -258,8 +259,8 @@ public class TestShuffleScheduler {
 
     //Generate 0-200 events
     for (int i = 0; i < 200; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -283,7 +284,7 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     assertEquals(10, scheduler.remainingMaps.get());
@@ -293,7 +294,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
     //Shuffle has not stalled. so no issues.
@@ -306,7 +307,7 @@ public class TestShuffleScheduler {
         new InputAttemptIdentifier(190, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" +
         (190 % totalProducerNodes),
-        10000, 190), false, true, false);
+        10000, 190, 1), false, true, false);
 
     //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
     verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
@@ -318,15 +319,15 @@ public class TestShuffleScheduler {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
     assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
@@ -339,11 +340,11 @@ public class TestShuffleScheduler {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
     // Should fail now due to fetcherHealthy. (stall has already happened and
@@ -374,8 +375,8 @@ public class TestShuffleScheduler {
 
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -388,14 +389,14 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -405,13 +406,13 @@ public class TestShuffleScheduler {
     //Retry for 3 more times
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 310), false, true, false);
+        10000, 310, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 310), false, true, false);
+        10000, 310, 1), false, true, false);
 
     // failedShufflesSinceLastCompletion has crossed the limits. Throw error
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -439,8 +440,8 @@ public class TestShuffleScheduler {
 
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -451,19 +452,19 @@ public class TestShuffleScheduler {
           new InputAttemptIdentifier(i, 0, "attempt_");
 
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i), false, true, false);
+          totalProducerNodes), 10000, i, 1), false, true, false);
 
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i), false, true, false);
+          totalProducerNodes), 10000, i, 1), false, true, false);
 
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
-          totalProducerNodes), 10000, i), false, true, false);
+          totalProducerNodes), 10000, i, 1), false, true, false);
 
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
       //319 succeeds
@@ -474,14 +475,14 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
 
     //stall the shuffle (but within limits)
     scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
@@ -491,13 +492,13 @@ public class TestShuffleScheduler {
     //Retry for 3 more times
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
 
     // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
     // well. However, it has failed only in one host. So this should proceed
@@ -508,7 +509,7 @@ public class TestShuffleScheduler {
     scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
         totalProducerNodes),
-        10000, 319), false, true, false);
+        10000, 319, 1), false, true, false);
     verify(shuffle, times(1)).reportException(any(Throwable.class));
 
   }
@@ -534,8 +535,8 @@ public class TestShuffleScheduler {
 
     //Generate 319 events (last event has not arrived)
     for (int i = 0; i < 319; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -548,14 +549,14 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(318, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes),
-        10000, 318), false, true, false);
+        10000, 318, 1), false, true, false);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -565,13 +566,13 @@ public class TestShuffleScheduler {
     //Retry for 3 more times
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
         totalProducerNodes),
-        10000, 318), false, true, false);
+        10000, 318, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
         totalProducerNodes),
-        10000, 318), false, true, false);
+        10000, 318, 1), false, true, false);
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
         totalProducerNodes),
-        10000, 318), false, true, false);
+        10000, 318, 1), false, true, false);
 
     //Shuffle has not received the events completely. So do not bail out yet.
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -613,8 +614,8 @@ public class TestShuffleScheduler {
 
     //Generate 320 events (last event has not arrived)
     for (int i = 0; i < 320; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, inputAttemptIdentifier);
     }
@@ -627,7 +628,7 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
+          10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //5 fetches fail once
@@ -635,7 +636,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
     assertTrue(scheduler.failureCounts.size() >= 5);
@@ -649,9 +650,9 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
-          10000, i), false, true, false);
+          10000, i, 1), false, true, false);
     }
 
     boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
@@ -689,8 +690,8 @@ public class TestShuffleScheduler {
 
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
           inputAttemptIdentifier);
     }
@@ -703,7 +704,7 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
           100, 200, startTime + (i * 100), mapOutput, false);
     }
 
@@ -712,16 +713,16 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost("host" + (i % totalProducerNodes), 10000, i),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i, 1),
           false, true, false);
     }
 
@@ -752,8 +753,8 @@ public class TestShuffleScheduler {
     Shuffle shuffle = mock(Shuffle.class);
     final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
 
-    InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(0, 0, "attempt_");
+    CompositeInputAttemptIdentifier inputAttemptIdentifier =
+        new CompositeInputAttemptIdentifier(0, 0, "attempt_", 1);
     scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier);
 
     assertTrue(scheduler.pendingHosts.size() == 1);
@@ -800,8 +801,8 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
 
       for (int i = 0; i < numInputs; i++) {
-        InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(i, 0, "attempt_");
+        CompositeInputAttemptIdentifier inputAttemptIdentifier =
+            new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
         scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
@@ -856,8 +857,8 @@ public class TestShuffleScheduler {
     InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
 
     for (int i = 0; i < numInputs; i++) {
-      InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(i, 0, "attempt_");
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
       scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
       identifiers[i] = inputAttemptIdentifier;
     }


Mime
View raw message