tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [2/2] tez git commit: TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler (jeagles)
Date Tue, 06 Dec 2016 17:31:20 GMT
TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe6746d7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe6746d7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe6746d7

Branch: refs/heads/TEZ-3334
Commit: fe6746d7882510369090dce28e2e725565a73aa1
Parents: 25643aa
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Tue Dec 6 11:30:51 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue Dec 6 11:30:51 2016 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |   1 +
 .../org/apache/tez/auxservices/IndexCache.java  |  51 ++-
 .../apache/tez/auxservices/ShuffleHandler.java  | 158 +++++---
 .../tez/auxservices/TestShuffleHandler.java     |  52 ++-
 .../common/CompositeInputAttemptIdentifier.java |  68 ++++
 .../library/common/shuffle/FetchResult.java     |  12 +-
 .../runtime/library/common/shuffle/Fetcher.java | 357 ++++++++++++-------
 .../library/common/shuffle/InputHost.java       |  63 +++-
 .../library/common/shuffle/ShuffleUtils.java    |   6 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  85 +++--
 .../common/shuffle/impl/ShuffleManager.java     |  21 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   | 292 ++++++++-------
 .../common/shuffle/orderedgrouped/MapHost.java  |   8 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  |   3 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  77 +++-
 .../orderedgrouped/ShuffleScheduler.java        |  24 +-
 .../runtime/library/input/UnorderedKVInput.java |   4 +-
 .../library/common/shuffle/TestFetcher.java     |  26 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |  35 +-
 .../common/shuffle/impl/TestShuffleManager.java |   2 +-
 .../shuffle/orderedgrouped/TestFetcher.java     |  36 +-
 ...tShuffleInputEventHandlerOrderedGrouped.java |  29 +-
 .../orderedgrouped/TestShuffleScheduler.java    | 135 +++----
 23 files changed, 999 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index f93ec6e..3383e50 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler
   TEZ-3360. Tez Custom Shuffle Handler Documentation
   TEZ-3411. TestShuffleHandler#testSendMapCount should not used hard coded ShuffleHandler port
   TEZ-3412. Modify ShuffleHandler to use Constants.DAG_PREFIX and fix AttemptPathIdentifier#toString()

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
index 532187e..247144c 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java
@@ -46,6 +46,45 @@ class IndexCache {
   }
 
   /**
+   * This method gets the spill record for the given mapId.
+   * It reads the index file into cache if it is not already present.
+   * @param mapId
+   * @param fileName The file to read the index information from if it is not
+   *                 already present in the cache
+   * @param expectedIndexOwner The expected owner of the index file
+   * @return The spill record for this map
+   * @throws IOException
+   */
+  public TezSpillRecord getSpillRecord(String mapId, Path fileName, String expectedIndexOwner)
+      throws IOException {
+
+    IndexInformation info = cache.get(mapId);
+
+    if (info == null) {
+      info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
+    } else {
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      }
+    }
+
+    if (info.mapSpillRecord.size() == 0) {
+      throw new IOException("Invalid request " +
+          " Map Id = " + mapId + " Index Info Length = " + info.mapSpillRecord.size());
+    }
+    return info.mapSpillRecord;
+  }
+
+  /**
    * This method gets the index information for the given mapId and reduce.
    * It reads the index file into cache if it is not already present.
    * @param mapId
@@ -74,7 +113,9 @@ class IndexCache {
           }
         }
       }
-      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      }
     }
 
     if (info.mapSpillRecord.size() == 0 ||
@@ -108,10 +149,14 @@ class IndexCache {
           }
         }
       }
-      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      }
       return info;
     }
-    LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("IndexCache MISS: MapId " + mapId + " not found");
+    }
     TezSpillRecord tmp = null;
     try {
       tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index fdaba86..80b9d46 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
@@ -94,6 +95,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
@@ -306,20 +308,20 @@ public class ShuffleHandler extends AuxiliaryService {
     private List<String> mapIds;
     private AtomicInteger mapsToWait;
     private AtomicInteger mapsToSend;
-    private int reduceId;
+    private Range reduceRange;
     private ChannelHandlerContext ctx;
     private String user;
     private Map<String, Shuffle.MapOutputInfo> infoMap;
     private String jobId;
     private String dagId;
 
-    public ReduceContext(List<String> mapIds, int rId,
+    public ReduceContext(List<String> mapIds, Range reduceRange,
                          ChannelHandlerContext context, String usr,
                          Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
                          String jobId, String dagId) {
 
       this.mapIds = mapIds;
-      this.reduceId = rId;
+      this.reduceRange = reduceRange;
       this.dagId = dagId;
       /**
       * Atomic count for tracking the no. of map outputs that are yet to
@@ -340,8 +342,8 @@ public class ShuffleHandler extends AuxiliaryService {
       this.jobId = jobId;
     }
 
-    public int getReduceId() {
-      return reduceId;
+    public Range getReduceRange() {
+      return reduceRange;
     }
 
     public ChannelHandlerContext getCtx() {
@@ -798,6 +800,29 @@ public class ShuffleHandler extends AuxiliaryService {
 
   }
 
+  protected static class Range {
+    final int first;
+    final int last;
+
+    Range(int first, int last) {
+      this.first = first;
+      this.last = last;
+    }
+
+    int getFirst() {
+      return first;
+    }
+
+    int getLast() {
+      return last;
+    }
+
+    @Override
+    public String toString() {
+      return new String("range: " + first + "-" + last);
+    }
+  }
+
   class Shuffle extends SimpleChannelUpstreamHandler {
 
     private static final int MAX_WEIGHT = 10 * 1024 * 1024;
@@ -865,13 +890,26 @@ public class ShuffleHandler extends AuxiliaryService {
       if (null == mapq) {
         return null;
       }
-      final List<String> ret = new ArrayList<String>();
+      final List<String> ret = new ArrayList<>();
       for (String s : mapq) {
         Collections.addAll(ret, s.split(","));
       }
       return ret;
     }
 
+    private Range splitReduces(List<String> reduceq) {
+      if (null == reduceq || reduceq.size() != 1) {
+        return null;
+      }
+      String[] reduce = reduceq.get(0).split("-");
+      int first = Integer.parseInt(reduce[0]);
+      int last = first;
+      if (reduce.length > 1) {
+        last = Integer.parseInt(reduce[1]);
+      }
+      return new Range(first, last);
+    }
+
     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
         throws Exception {
@@ -915,13 +953,13 @@ public class ShuffleHandler extends AuxiliaryService {
         }
       }
       final List<String> mapIds = splitMaps(q.get("map"));
-      final List<String> reduceQ = q.get("reduce");
+      final Range reduceRange = splitReduces(q.get("reduce"));
       final List<String> jobQ = q.get("job");
       final List<String> dagIdQ = q.get("dag");
       if (LOG.isDebugEnabled()) {
         LOG.debug("RECV: " + request.getUri() +
             "\n  mapId: " + mapIds +
-            "\n  reduceId: " + reduceQ +
+            "\n  reduceId: " + reduceRange +
             "\n  jobId: " + jobQ +
             "\n  dagId: " + dagIdQ +
             "\n  keepAlive: " + keepAliveParam);
@@ -930,11 +968,11 @@ public class ShuffleHandler extends AuxiliaryService {
       if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ))  {
         return;
       }
-      if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) {
+      if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
         sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
         return;
       }
-      if (reduceQ.size() != 1 || jobQ.size() != 1) {
+      if (jobQ.size() != 1) {
         sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
         return;
       }
@@ -944,13 +982,11 @@ public class ShuffleHandler extends AuxiliaryService {
       // on log4j.properties by uncommenting the setting
       if (AUDITLOG.isDebugEnabled()) {
         AUDITLOG.debug("shuffle for " + jobQ.get(0) +
-                         " reducer " + reduceQ.get(0));
+                         " reducer " + reduceRange);
       }
-      int reduceId;
       String jobId;
       String dagId;
       try {
-        reduceId = Integer.parseInt(reduceQ.get(0));
         jobId = jobQ.get(0);
         dagId = dagIdQ.get(0);
       } catch (NumberFormatException e) {
@@ -982,7 +1018,7 @@ public class ShuffleHandler extends AuxiliaryService {
       String user = userRsrc.get(jobId);
 
       try {
-        populateHeaders(mapIds, jobId, dagId, user, reduceId, request,
+        populateHeaders(mapIds, jobId, dagId, user, reduceRange,
           response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
         ch.write(response);
@@ -993,7 +1029,7 @@ public class ShuffleHandler extends AuxiliaryService {
       }
       ch.write(response);
       //Initialize one ReduceContext object per messageReceived call
-      ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+      ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
           user, mapOutputInfoMap, jobId, dagId);
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
@@ -1050,14 +1086,14 @@ public class ShuffleHandler extends AuxiliaryService {
           MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
           if (info == null) {
             info = getMapOutputInfo(reduceContext.dagId, mapId,
-                reduceContext.getReduceId(), reduceContext.getJobId(),
+                reduceContext.getJobId(),
                 reduceContext.getUser());
           }
           nextMap = sendMapOutput(
               reduceContext.getCtx(),
               reduceContext.getCtx().getChannel(),
               reduceContext.getUser(), mapId,
-              reduceContext.getReduceId(), info);
+              reduceContext.getReduceRange(), info);
           if (null == nextMap) {
             sendError(reduceContext.getCtx(), NOT_FOUND);
             return null;
@@ -1103,7 +1139,7 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                             int reduce, String jobId,
+                                             String jobId,
                                              String user) throws IOException {
       AttemptPathInfo pathInfo;
       try {
@@ -1123,8 +1159,8 @@ public class ShuffleHandler extends AuxiliaryService {
         }
       }
 
-      TezIndexRecord info =
-        indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
+      TezSpillRecord spillRecord =
+          indexCache.getSpillRecord(mapId, pathInfo.indexPath, user);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
@@ -1132,13 +1168,14 @@ public class ShuffleHandler extends AuxiliaryService {
             pathInfo.indexPath);
       }
 
-      MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
+
+      MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord);
       return outputInfo;
     }
 
     protected void populateHeaders(List<String> mapIds, String jobId,
                                    String dagId, String user,
-                                   int reduce, HttpRequest request,
+                                   Range reduceRange,
                                    HttpResponse response,
                                    boolean keepAliveParam,
                                    Map<String, MapOutputInfo> mapOutputInfoMap)
@@ -1146,20 +1183,21 @@ public class ShuffleHandler extends AuxiliaryService {
 
       long contentLength = 0;
       for (String mapId : mapIds) {
-        MapOutputInfo outputInfo =
-            getMapOutputInfo(dagId, mapId, reduce, jobId, user);
+        MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user);
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
         }
-
-        ShuffleHeader header =
-            new ShuffleHeader(mapId, outputInfo.indexRecord.getPartLength(),
-            outputInfo.indexRecord.getRawLength(), reduce);
         DataOutputBuffer dob = new DataOutputBuffer();
-        header.write(dob);
-
-        contentLength += outputInfo.indexRecord.getPartLength();
-        contentLength += dob.getLength();
+        for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
+          TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce);
+          ShuffleHeader header =
+              new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce);
+          dob.reset();
+          header.write(dob);
+
+          contentLength += dob.getLength();
+          contentLength += indexRecord.getPartLength();
+        }
       }
 
       // Now set the response headers.
@@ -1185,11 +1223,11 @@ public class ShuffleHandler extends AuxiliaryService {
 
     class MapOutputInfo {
       final Path mapOutputFileName;
-      final TezIndexRecord indexRecord;
+      final TezSpillRecord spillRecord;
 
-      MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) {
+      MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord) {
         this.mapOutputFileName = mapOutputFileName;
-        this.indexRecord = indexRecord;
+        this.spillRecord = spillRecord;
       }
     }
 
@@ -1235,28 +1273,46 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
-        String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
+                                          String user, String mapId, Range reduceRange, MapOutputInfo outputInfo)
         throws IOException {
-      final TezIndexRecord info = mapOutputInfo.indexRecord;
-      final ShuffleHeader header =
-        new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
-      final DataOutputBuffer dob = new DataOutputBuffer();
-      header.write(dob);
-      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      final File spillfile =
-          new File(mapOutputInfo.mapOutputFileName.toString());
+      TezIndexRecord firstIndex = null;
+      TezIndexRecord lastIndex = null;
+
+      DataOutputBuffer dobRange = new DataOutputBuffer();
+      // Indicate how many record to be written
+      WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
+      ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
+      for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
+        TezIndexRecord index = outputInfo.spillRecord.getIndex(reduce);
+        // Records are only valid if they have a non-zero part length
+        if (index.getPartLength() != 0) {
+          if (firstIndex == null) {
+            firstIndex = index;
+          }
+          lastIndex = index;
+        }
+
+        ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce);
+        DataOutputBuffer dob = new DataOutputBuffer();
+        header.write(dob);
+        ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+      }
+
+      final long rangeOffset = firstIndex.getStartOffset();
+      final long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset();
+      final File spillFile = new File(outputInfo.mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
-        spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
+        spill = SecureIOUtils.openForRandomRead(spillFile, "r", user, null);
       } catch (FileNotFoundException e) {
-        LOG.info(spillfile + " not found");
+        LOG.info(spillFile + " not found");
         return null;
       }
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength,
-            readaheadPool, spillfile.getAbsolutePath(),
+            rangeOffset, rangePartLength, manageOsCache, readaheadLength,
+            readaheadPool, spillFile.getAbsolutePath(),
             shuffleBufferSize, shuffleTransferToAllowed);
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
@@ -1273,13 +1329,13 @@ public class ShuffleHandler extends AuxiliaryService {
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            info.getStartOffset(), info.getPartLength(), sslFileBufferSize,
+            rangeOffset, rangePartLength, sslFileBufferSize,
             manageOsCache, readaheadLength, readaheadPool,
-            spillfile.getAbsolutePath());
+            spillFile.getAbsolutePath());
         writeFuture = ch.write(chunk);
       }
       metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(info.getPartLength()); // optimistic
+      metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic
       return writeFuture;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index ebd9c5d..a790b9a 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -62,8 +62,6 @@ import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -110,7 +108,7 @@ public class TestShuffleHandler {
         }
         @Override
         protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                 int reduce, String jobId,
+                                                 String jobId,
                                                  String user)
             throws IOException {
           // Do nothing.
@@ -118,17 +116,16 @@ public class TestShuffleHandler {
         }
         @Override
         protected void populateHeaders(List<String> mapIds, String jobId,
-                                       String dagId, String user, int reduce,
-                                       HttpRequest request,
+                                       String dagId, String user, Range reduceRange,
                                        HttpResponse response,
                                        boolean keepAliveParam,
-            Map<String, MapOutputInfo> infoMap) throws IOException {
+                                       Map<String, MapOutputInfo> infoMap) throws IOException {
           // Do nothing.
         }
         @Override
         protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-            Channel ch, String user, String mapId, int reduce,
-            MapOutputInfo info) throws IOException {
+                                              Channel ch, String user, String mapId, Range reduceRange,
+                                              MapOutputInfo info) throws IOException {
 
           ShuffleHeader header =
               new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
@@ -236,18 +233,17 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   int reduce, String jobId,
+                                                   String jobId,
                                                    String user)
               throws IOException {
             return null;
           }
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
-                                         String dagId, String user, int reduce,
-                                         HttpRequest request,
+                                         String dagId, String user, Range reduceRange,
                                          HttpResponse response,
                                          boolean keepAliveParam,
-              Map<String, MapOutputInfo> infoMap) throws IOException {
+                                         Map<String, MapOutputInfo> infoMap) throws IOException {
             // Only set response headers and skip everything else
             // send some dummy value for content-length
             super.setResponseHeaders(response, keepAliveParam, 100);
@@ -259,8 +255,8 @@ public class TestShuffleHandler {
           }
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info)
+                                                Channel ch, String user, String mapId, Range reduceRange,
+                                                MapOutputInfo info)
                   throws IOException {
             // send a shuffle header and a lot of data down the channel
             // to trigger a broken pipe
@@ -335,7 +331,6 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   int reduce,
                                                    String jobId, String user)
               throws IOException {
             return null;
@@ -349,7 +344,7 @@ public class TestShuffleHandler {
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
                                          String dagId, String user,
-                                         int reduce, HttpRequest request,
+                                         Range reduceRange,
                                          HttpResponse response,
                                          boolean keepAliveParam,
                                          Map<String, MapOutputInfo> infoMap)
@@ -376,8 +371,8 @@ public class TestShuffleHandler {
 
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info) throws IOException {
+                                                Channel ch, String user, String mapId, Range reduceRange,
+                                                MapOutputInfo info) throws IOException {
             HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
 
             // send a shuffle header and a lot of data down the channel
@@ -549,7 +544,7 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
-                                                   int reduce, String jobId,
+                                                   String jobId,
                                                    String user)
               throws IOException {
             // Do nothing.
@@ -557,11 +552,10 @@ public class TestShuffleHandler {
           }
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
-                                         String dagId, String user, int reduce,
-                                         HttpRequest request,
+                                         String dagId, String user, Range reduceRange,
                                          HttpResponse response,
                                          boolean keepAliveParam,
-              Map<String, MapOutputInfo> infoMap) throws IOException {
+                                         Map<String, MapOutputInfo> infoMap) throws IOException {
             // Do nothing.
           }
           @Override
@@ -572,8 +566,8 @@ public class TestShuffleHandler {
           }
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info)
+                                                Channel ch, String user, String mapId, Range reduceRange,
+                                                MapOutputInfo info)
                   throws IOException {
             // send a shuffle header and a lot of data down the channel
             // to trigger a broken pipe
@@ -985,9 +979,9 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected void populateHeaders(List<String> mapIds,
-              String outputBaseStr, String dagId, String user, int reduce,
-              HttpRequest request, HttpResponse response,
-              boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
+                                         String outputBaseStr, String dagId, String user, Range reduceRange,
+                                         HttpResponse response,
+                                         boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
               throws IOException {
             // Only set response headers and skip everything else
             // send some dummy value for content-length
@@ -1009,8 +1003,8 @@ public class TestShuffleHandler {
           }
           @Override
           protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info) throws IOException {
+                                                Channel ch, String user, String mapId, Range reduceRange,
+                                                MapOutputInfo info) throws IOException {
             // send a shuffle header
             ShuffleHeader header =
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
new file mode 100644
index 0000000..30295bd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class CompositeInputAttemptIdentifier extends InputAttemptIdentifier {
+  private final int inputIdentifierCount;
+
+  public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, int inputIdentifierCount) {
+    this(inputIdentifier, attemptNumber, pathComponent, false, SPILL_INFO.FINAL_MERGE_ENABLED, -1, inputIdentifierCount);
+  }
+
+  public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean isShared, int inputIdentifierCount) {
+    this(inputIdentifier, attemptNumber, pathComponent, isShared, SPILL_INFO.FINAL_MERGE_ENABLED, -1, inputIdentifierCount);
+  }
+
+  public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent,
+      boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId, int inputIdentifierCount) {
+    super(inputIdentifier, attemptNumber, pathComponent, shared, fetchTypeInfo, spillEventId);
+    this.inputIdentifierCount = inputIdentifierCount;
+  }
+
+
+  public int getInputIdentifierCount() {
+    return inputIdentifierCount;
+  }
+
+  public InputAttemptIdentifier expand(int inputIdentifierOffset) {
+    return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId());
+  }
+
+  // PathComponent & shared does not need to be part of the hashCode and equals computation.
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  @Override
+  public String toString() {
+    return super.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
index d9595f0..9a5890d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
@@ -42,19 +42,21 @@ public class FetchResult {
   private final String host;
   private final int port;
   private final int partition;
+  private final int partitionCount;
   private final Iterable<InputAttemptIdentifier> pendingInputs;
   private final String additionalInfo;
 
-  public FetchResult(String host, int port, int partition,
+  public FetchResult(String host, int port, int partition, int partitionCount,
       Iterable<InputAttemptIdentifier> pendingInputs) {
-    this(host, port, partition, pendingInputs, null);
+    this(host, port, partition, partitionCount, pendingInputs, null);
   }
 
-  public FetchResult(String host, int port, int partition,
+  public FetchResult(String host, int port, int partition, int partitionCount,
       Iterable<InputAttemptIdentifier> pendingInputs, String additionalInfo) {
     this.host = host;
     this.port = port;
     this.partition = partition;
+    this.partitionCount = partitionCount;
     this.pendingInputs = pendingInputs;
     this.additionalInfo = additionalInfo;
   }
@@ -71,6 +73,10 @@ public class FetchResult {
     return partition;
   }
 
+  public int getPartitionCount() {
+    return partitionCount;
+  }
+
   public Iterable<InputAttemptIdentifier> getPendingInputs() {
     return pendingInputs;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 896f532..7b1abab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,8 +41,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.lang.StringUtils;
@@ -72,6 +75,50 @@ import com.google.common.base.Preconditions;
  */
 public class Fetcher extends CallableWithNdc<FetchResult> {
 
+  public static class PathPartition {
+
+    final String path;
+    final int partition;
+
+    PathPartition(String path, int partition) {
+      this.path = path;
+      this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((path == null) ? 0 : path.hashCode());
+      result = prime * result + partition;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PathPartition other = (PathPartition) obj;
+      if (path == null) {
+        if (other.path != null)
+          return false;
+      } else if (!path.equals(other.path))
+        return false;
+      if (partition != other.partition)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "PathPartition [path=" + path + ", partition=" + partition + "]";
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
 
   private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
@@ -117,9 +164,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   private int port;
   private int partition;
+  private int partitionCount;
 
   // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
-  private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
+  private final Map<PathPartition, InputAttemptIdentifier> pathToAttemptMap;
 
   private URL url;
   private volatile DataInputStream input;
@@ -138,6 +186,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private long retryStartTime = 0;
 
   private final boolean asyncHttp;
+  private final boolean compositeFetch;
 
   private final boolean verifyDiskChecksum;
 
@@ -152,7 +201,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       boolean localDiskFetchEnabled,
       boolean sharedFetchEnabled,
       String localHostname,
-      int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
+      int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
     this.asyncHttp = asyncHttp;
     this.verifyDiskChecksum = verifyDiskChecksum;
     this.fetcherCallback = fetcherCallback;
@@ -160,7 +209,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     this.jobTokenSecretMgr = jobTokenSecretManager;
     this.appId = appId;
     this.dagIdentifier = dagIdentifier;
-    this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
+    this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>();
     this.httpConnectionParams = params;
     this.conf = conf;
 
@@ -175,6 +224,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     this.lockPath = lockPath;
     this.localHostname = localHostname;
     this.shufflePort = shufflePort;
+    this.compositeFetch = compositeFetch;
 
     try {
       if (this.sharedFetchEnabled) {
@@ -200,12 +250,20 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
 
     if (srcAttempts.size() == 0) {
-      return new FetchResult(host, port, partition, srcAttempts);
+      return new FetchResult(host, port, partition, partitionCount, srcAttempts);
     }
 
     populateRemainingMap(srcAttempts);
     for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) {
-      pathToAttemptMap.put(in.getPathComponent(), in);
+      if (in instanceof CompositeInputAttemptIdentifier) {
+        CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in;
+        for (int i = 0; i < cin.getInputIdentifierCount(); i++) {
+          pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), i), cin.expand(i));
+        }
+      } else {
+        pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in);
+      }
+
       // do only if all of them are shared fetches
       multiplex &= in.isShared();
     }
@@ -390,7 +448,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       lock = getLock();
       if (lock == null) {
         // re-queue until we get a lock
-        return new HostFetchResult(new FetchResult(host, port, partition,
+        return new HostFetchResult(new FetchResult(host, port, partition, partitionCount,
             srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false);
       } else {
         if (findInputs() == srcAttemptsRemaining.size()) {
@@ -416,7 +474,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     if (isShutDown.get()) {
       // if any exception was due to shut-down don't bother firing any more
       // requests
-      return new HostFetchResult(new FetchResult(host, port, partition,
+      return new HostFetchResult(new FetchResult(host, port, partition, partitionCount,
           srcAttemptsRemaining.values()), null, false);
     }
     // no more caching
@@ -431,7 +489,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-          port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
+          port, partition, partitionCount, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
           httpConnectionParams.isKeepAlive());
 
@@ -456,7 +514,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         failedFetches = srcAttemptsRemaining.values().
             toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
       }
-      return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedFetches, true);
+      return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, true);
     }
     if (isShutDown.get()) {
       // shutdown would have no effect if in the process of establishing the connection.
@@ -464,7 +522,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       if (isDebugEnabled) {
         LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
       }
-      return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false);
+      return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false);
     }
 
     try {
@@ -486,7 +544,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         InputAttemptIdentifier firstAttempt = attempts.iterator().next();
         LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
             + " Informing ShuffleManager: ", e);
-        return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()),
+        return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
             new InputAttemptIdentifier[] { firstAttempt }, false);
       }
     } catch (InterruptedException e) {
@@ -514,7 +572,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       if (isDebugEnabled) {
         LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
       }
-      return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false);
+      return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false);
     }
     // After this point, closing the stream and connection, should cause a
     // SocketException,
@@ -532,7 +590,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           LOG.debug("Fetcher already shutdown. Aborting queued fetches for " +
               srcAttemptsRemaining.size() + " inputs");
         }
-        return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null,
+        return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null,
             false);
       }
       try {
@@ -545,7 +603,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
             LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " +
                 srcAttemptsRemaining.size() + " inputs");
           }
-          return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null,
+          return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null,
               false);
         }
         // Connect again.
@@ -563,7 +621,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       }
       failedInputs = null;
     }
-    return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedInputs,
+    return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedInputs,
         false);
   }
 
@@ -650,7 +708,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     } else {
       // nothing needs to be done to requeue remaining entries
     }
-    return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()),
+    return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()),
         failedFetches, false);
   }
 
@@ -732,125 +790,166 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
   }
 
+  private static class MapOutputStat {
+    final InputAttemptIdentifier srcAttemptId;
+    final long decompressedLength;
+    final long compressedLength;
+    final int forReduce;
+
+    MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) {
+      this.srcAttemptId = srcAttemptId;
+      this.decompressedLength = decompressedLength;
+      this.compressedLength = compressedLength;
+      this.forReduce = forReduce;
+    }
+
+    @Override
+    public String toString() {
+      return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce);
+    }
+  }
   private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
       CachingCallBack callback) throws FetcherReadTimeoutException {
     FetchedInput fetchedInput = null;
     InputAttemptIdentifier srcAttemptId = null;
-    long decompressedLength = -1;
-    long compressedLength = -1;
-
+    long decompressedLength = 0;
+    long compressedLength = 0;
     try {
       long startTime = System.currentTimeMillis();
-      int responsePartition = -1;
-      // Read the shuffle header
-      String pathComponent = null;
-      try {
-        ShuffleHeader header = new ShuffleHeader();
-        header.readFields(input);
-        pathComponent = header.getMapId();
-
-        srcAttemptId = pathToAttemptMap.get(pathComponent);
-        compressedLength = header.getCompressedLength();
-        decompressedLength = header.getUncompressedLength();
-        responsePartition = header.getPartition();
-      } catch (IllegalArgumentException e) {
-        // badIdErrs.increment(1);
-        if (!isShutDown.get()) {
-          LOG.warn("Invalid src id ", e);
-          // Don't know which one was bad, so consider all of them as bad
-          return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
-        } else {
-          if (isDebugEnabled) {
-            LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
-          }
-          return null;
-        }
+      int partitionCount = 1;
+
+      if (this.compositeFetch) {
+        // Multiple partitions are fetched
+        partitionCount = WritableUtils.readVInt(input);
       }
+      ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount);
+      for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) {
+        MapOutputStat mapOutputStat = null;
+        int responsePartition = -1;
+        // Read the shuffle header
+        String pathComponent = null;
+        try {
+          ShuffleHeader header = new ShuffleHeader();
+          header.readFields(input);
+          pathComponent = header.getMapId();
+          srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
+
+          if (header.getCompressedLength() == 0) {
+            // Empty partitions are already accounted for
+            continue;
+          }
 
-      // Do some basic sanity verification
-      if (!verifySanity(compressedLength, decompressedLength,
-          responsePartition, srcAttemptId, pathComponent)) {
-        if (!isShutDown.get()) {
-          if (srcAttemptId == null) {
-            LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
-            srcAttemptId = getNextRemainingAttempt();
+          mapOutputStat = new MapOutputStat(srcAttemptId,
+              header.getUncompressedLength(),
+              header.getCompressedLength(),
+              header.getPartition());
+          mapOutputStats.add(mapOutputStat);
+          responsePartition = header.getPartition();
+        } catch (IllegalArgumentException e) {
+          // badIdErrs.increment(1);
+          if (!isShutDown.get()) {
+            LOG.warn("Invalid src id ", e);
+            // Don't know which one was bad, so consider all of them as bad
+            return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
+          } else {
+            if (isDebugEnabled) {
+              LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+            }
+            return null;
           }
-          assert (srcAttemptId != null);
-          return new InputAttemptIdentifier[]{srcAttemptId};
-        } else {
-          if (isDebugEnabled) {
-            LOG.debug("Already shutdown. Ignoring verification failure.");
+        }
+
+        // Do some basic sanity verification
+        if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength,
+            responsePartition, mapOutputStat.srcAttemptId, pathComponent)) {
+          if (!isShutDown.get()) {
+            srcAttemptId = mapOutputStat.srcAttemptId;
+            if (srcAttemptId == null) {
+              LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+              srcAttemptId = getNextRemainingAttempt();
+            }
+            assert (srcAttemptId != null);
+            return new InputAttemptIdentifier[]{srcAttemptId};
+          } else {
+            if (isDebugEnabled) {
+              LOG.debug("Already shutdown. Ignoring verification failure.");
+            }
+            return null;
           }
-          return null;
         }
-      }
 
-      if (isDebugEnabled) {
-        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
-            + ", decomp len: " + decompressedLength);
-      }
-      
-      // TODO TEZ-957. handle IOException here when Broadcast has better error checking
-      if (srcAttemptId.isShared() && callback != null) {
-        // force disk if input is being shared
-        fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
-            compressedLength, srcAttemptId);
-      } else {
-        fetchedInput = inputManager.allocate(decompressedLength,
-            compressedLength, srcAttemptId);
-      }
-      // No concept of WAIT at the moment.
-      // // Check if we can shuffle *now* ...
-      // if (fetchedInput.getType() == FetchedInput.WAIT) {
-      // LOG.info("fetcher#" + id +
-      // " - MergerManager returned Status.WAIT ...");
-      // //Not an error but wait to process data.
-      // return EMPTY_ATTEMPT_ID_ARRAY;
-      // }
-
-      // Go!
-      if (isDebugEnabled) {
-        LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
-            + fetchedInput.getInputAttemptIdentifier() + " decomp: "
-            + decompressedLength + " len: " + compressedLength + " to "
-            + fetchedInput.getType());
+        if (isDebugEnabled) {
+          LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength
+              + ", decomp len: " + mapOutputStat.decompressedLength);
+        }
       }
 
-      if (fetchedInput.getType() == Type.MEMORY) {
-        ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
-          input, (int) decompressedLength, (int) compressedLength, codec,
-          ifileReadAhead, ifileReadAheadLength, LOG,
-          fetchedInput.getInputAttemptIdentifier().toString());
-      } else if (fetchedInput.getType() == Type.DISK) {
-        ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
-          (host +":" +port), input, compressedLength, decompressedLength, LOG,
-          fetchedInput.getInputAttemptIdentifier().toString(),
-          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
-      } else {
-        throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
-            fetchedInput);
-      }
+      for (MapOutputStat mapOutputStat : mapOutputStats) {
+        // Get the location for the map output - either in-memory or on-disk
+        srcAttemptId = mapOutputStat.srcAttemptId;
+        decompressedLength = mapOutputStat.decompressedLength;
+        compressedLength = mapOutputStat.compressedLength;
+        // TODO TEZ-957. handle IOException here when Broadcast has better error checking
+        if (srcAttemptId.isShared() && callback != null) {
+          // force disk if input is being shared
+          fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
+              compressedLength, srcAttemptId);
+        } else {
+          fetchedInput = inputManager.allocate(decompressedLength,
+              compressedLength, srcAttemptId);
+        }
+        // No concept of WAIT at the moment.
+        // // Check if we can shuffle *now* ...
+        // if (fetchedInput.getType() == FetchedInput.WAIT) {
+        // LOG.info("fetcher#" + id +
+        // " - MergerManager returned Status.WAIT ...");
+        // //Not an error but wait to process data.
+        // return EMPTY_ATTEMPT_ID_ARRAY;
+        // }
+
+        // Go!
+        if (isDebugEnabled) {
+          LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
+              + fetchedInput.getInputAttemptIdentifier() + " decomp: "
+              + decompressedLength + " len: " + compressedLength + " to "
+              + fetchedInput.getType());
+        }
 
-      // offer the fetched input for caching
-      if (srcAttemptId.isShared() && callback != null) {
-        // this has to be before the fetchSucceeded, because that goes across
-        // threads into the reader thread and can potentially shutdown this thread
-        // while it is still caching.
-        callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
-      }
+        if (fetchedInput.getType() == Type.MEMORY) {
+          ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+              input, (int) decompressedLength, (int) compressedLength, codec,
+              ifileReadAhead, ifileReadAheadLength, LOG,
+              fetchedInput.getInputAttemptIdentifier().toString());
+        } else if (fetchedInput.getType() == Type.DISK) {
+          ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+              (host + ":" + port), input, compressedLength, decompressedLength, LOG,
+              fetchedInput.getInputAttemptIdentifier().toString(),
+              ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
+        } else {
+          throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
+              fetchedInput);
+        }
 
-      // Inform the shuffle scheduler
-      long endTime = System.currentTimeMillis();
-      // Reset retryStartTime as map task make progress if retried before.
-      retryStartTime = 0;
-      fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
-          compressedLength, decompressedLength, (endTime - startTime));
+        // offer the fetched input for caching
+        if (srcAttemptId.isShared() && callback != null) {
+          // this has to be before the fetchSucceeded, because that goes across
+          // threads into the reader thread and can potentially shutdown this thread
+          // while it is still caching.
+          callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
+        }
+
+        // Inform the shuffle scheduler
+        long endTime = System.currentTimeMillis();
+        // Reset retryStartTime as map task make progress if retried before.
+        retryStartTime = 0;
+        fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
+            compressedLength, decompressedLength, (endTime - startTime));
 
-      // Note successful shuffle
-      srcAttemptsRemaining.remove(srcAttemptId.toString());
+        // Note successful shuffle
+        srcAttemptsRemaining.remove(srcAttemptId.toString());
 
-      // metrics.successFetch();
-      return null;
+        // metrics.successFetch();
+      }
     } catch (IOException ioe) {
       if (isShutDown.get()) {
         cleanupFetchedInput(fetchedInput);
@@ -887,6 +986,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // metrics.failedFetch();
       return new InputAttemptIdentifier[] { srcAttemptId };
     }
+    return null;
   }
 
   private void cleanupFetchedInput(FetchedInput fetchedInput) {
@@ -951,7 +1051,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       return false;
     }
 
-    if (fetchPartition != this.partition) {
+    if (fetchPartition < this.partition || fetchPartition >= this.partition + this.partitionCount) {
       // wrongReduceErrs.increment(1);
       LOG.warn(" data for the wrong reduce -> headerPathComponent: "
           + pathComponent + "nextRemainingSrcAttemptId: "
@@ -960,16 +1060,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           + " for reduce " + fetchPartition);
       return false;
     }
-
-    // Sanity check
-    // we are guaranteed that key is not null
-    if (srcAttemptsRemaining.get(srcAttemptId.toString()) == null) {
-      // wrongMapErrs.increment(1);
-      LOG.warn("Invalid input. Received output for headerPathComponent: "
-          + pathComponent + "nextRemainingSrcAttemptId: "
-          + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
-      return false;
-    }
     return true;
   }
   
@@ -992,10 +1082,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         HttpConnectionParams params, FetchedInputAllocator inputManager,
         ApplicationId appId, int dagIdentifier,  JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
-        boolean asyncHttp, boolean verifyDiskChecksum) {
+        boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
-          false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum);
+          false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -1004,11 +1094,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
-        String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
+        String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
           lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
-          verifyDiskChecksum);
+          verifyDiskChecksum, compositeFetch);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
@@ -1027,11 +1117,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       return this;
     }
 
-    public FetcherBuilder assignWork(String host, int port, int partition,
+    public FetcherBuilder assignWork(String host, int port, int partition, int partitionCount,
         List<InputAttemptIdentifier> inputs) {
       fetcher.host = host;
       fetcher.port = port;
       fetcher.partition = partition;
+      fetcher.partitionCount = partitionCount;
       fetcher.srcAttempts = inputs;
       workAssigned = true;
       return this;

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
index 969e06c..88dacb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
@@ -34,11 +34,48 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
  */
 public class InputHost extends HostPort {
 
+  private static class PartitionRange {
+
+    private final int partition;
+    private final int partitionCount;
+
+    PartitionRange(int partition, int partitionCount) {
+      this.partition = partition;
+      this.partitionCount = partitionCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      PartitionRange that = (PartitionRange) o;
+
+      if (partition != that.partition) return false;
+      return partitionCount == that.partitionCount;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = partition;
+      result = 31 * result + partitionCount;
+      return result;
+    }
+
+    public int getPartition() {
+      return partition;
+    }
+
+    public int getPartitionCount() {
+      return partitionCount;
+    }
+  }
+
   private String additionalInfo;
 
   // Each input host can support more than one partition.
   // Each partition has a list of inputs for pipelined shuffle.
-  private final Map<Integer, BlockingQueue<InputAttemptIdentifier>>
+  private final Map<PartitionRange, BlockingQueue<InputAttemptIdentifier>>
       partitionToInputs = new ConcurrentHashMap<>();
 
   public InputHost(HostPort hostPort) {
@@ -57,24 +94,25 @@ public class InputHost extends HostPort {
     return partitionToInputs.size();
   }
 
-  public synchronized void addKnownInput(Integer partition,
+  public synchronized void addKnownInput(int partition, int partitionCount,
       InputAttemptIdentifier srcAttempt) {
+    PartitionRange partitionRange = new PartitionRange(partition, partitionCount);
     BlockingQueue<InputAttemptIdentifier> inputs =
-        partitionToInputs.get(partition);
+        partitionToInputs.get(partitionRange);
     if (inputs == null) {
       inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
-      partitionToInputs.put(partition, inputs);
+      partitionToInputs.put(partitionRange, inputs);
     }
     inputs.add(srcAttempt);
   }
 
   public synchronized PartitionToInputs clearAndGetOnePartition() {
-    for (Map.Entry<Integer, BlockingQueue<InputAttemptIdentifier>> entry :
+    for (Map.Entry<PartitionRange, BlockingQueue<InputAttemptIdentifier>> entry :
         partitionToInputs.entrySet()) {
       List<InputAttemptIdentifier> inputs =
           new ArrayList<InputAttemptIdentifier>(entry.getValue().size());
       entry.getValue().drainTo(inputs);
-      PartitionToInputs ret = new PartitionToInputs(entry.getKey(), inputs);
+      PartitionToInputs ret = new PartitionToInputs(entry.getKey().getPartition(), entry.getKey().getPartitionCount(), inputs);
       partitionToInputs.remove(entry.getKey());
       return ret;
     }
@@ -103,12 +141,13 @@ public class InputHost extends HostPort {
   }
 
   public static class PartitionToInputs {
-    private int partition;
+    private final int partition;
+    private final int partitionCount;
     private List<InputAttemptIdentifier> inputs;
 
-    public PartitionToInputs(int partition,
-        List<InputAttemptIdentifier> input) {
+    public PartitionToInputs(int partition, int partitionCount, List<InputAttemptIdentifier> input) {
       this.partition = partition;
+      this.partitionCount = partitionCount;
       this.inputs = input;
     }
 
@@ -116,13 +155,17 @@ public class InputHost extends HostPort {
       return partition;
     }
 
+    public int getPartitionCount() {
+      return partitionCount;
+    }
+
     public List<InputAttemptIdentifier> getInputs() {
       return inputs;
     }
 
     @Override
     public String toString() {
-      return "partition=" + partition + ", inputs=" + inputs;
+      return "partition=" + partition + ", partitionCount=" + partitionCount + ", inputs=" + inputs;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 6fa43e8..1d644aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -193,7 +193,7 @@ public class ShuffleUtils {
   }
 
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
-      int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
+      int port, int partition, int partitionCount, String appId, int dagIdentifier, boolean sslShuffle) {
     final String http_protocol = (sslShuffle) ? "https://" : "http://";
     StringBuilder sb = new StringBuilder(http_protocol);
     sb.append(host);
@@ -206,6 +206,10 @@ public class ShuffleUtils {
     sb.append(String.valueOf(dagIdentifier));
     sb.append("&reduce=");
     sb.append(String.valueOf(partition));
+    if (partitionCount > 1) {
+      sb.append("-");
+      sb.append(String.valueOf(partition + partitionCount - 1));
+    }
     sb.append("&map=");
     return sb;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index c1893fc..a80d21b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -28,6 +28,7 @@ import java.util.zip.Inflater;
 import com.google.protobuf.ByteString;
 
 import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -61,6 +62,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private final int ifileReadAheadLength;
   private final boolean useSharedInputs;
   private final InputContext inputContext;
+  private final boolean compositeFetch;
   private final Inflater inflater;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
@@ -71,7 +73,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   public ShuffleInputEventHandlerImpl(InputContext inputContext,
                                       ShuffleManager shuffleManager,
                                       FetchedInputAllocator inputAllocator, CompressionCodec codec,
-                                      boolean ifileReadAhead, int ifileReadAheadLength) {
+                                      boolean ifileReadAhead, int ifileReadAheadLength, boolean compositeFetch) {
     this.inputContext = inputContext;
     this.shuffleManager = shuffleManager;
     this.inputAllocator = inputAllocator;
@@ -81,6 +83,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     // this currently relies on a user to enable the flag
     // expand on idea based on vertex parallelism and num inputs
     this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
+    this.compositeFetch = compositeFetch;
     this.inflater = TezCommonUtils.newInflater();
   }
 
@@ -113,10 +116,10 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
       shuffleManager.updateEventReceivedTime();
     } else if (event instanceof CompositeRoutedDataMovementEvent) {
-      CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event;
+      CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event;
       DataMovementEventPayloadProto shufflePayload;
       try {
-        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload()));
+        shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(crdme.getUserPayload()));
       } catch (InvalidProtocolBufferException e) {
         throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
       }
@@ -129,9 +132,14 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
           throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
         }
       }
-      for (int offset = 0; offset < edme.getCount(); offset++) {
-        numDmeEvents.incrementAndGet();
-        processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+      if (compositeFetch) {
+        numDmeEvents.addAndGet(crdme.getCount());
+        processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet);
+      } else {
+        for (int offset = 0; offset < crdme.getCount(); offset++) {
+          numDmeEvents.incrementAndGet();
+          processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet);
+        }
       }
       shuffleManager.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
@@ -166,23 +174,59 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
 
     if (shufflePayload.hasEmptyPartitions()) {
       if (emptyPartitionsBitSet.get(srcIndex)) {
-        InputAttemptIdentifier srcAttemptIdentifier =
-            constructInputAttemptIdentifier(dme, shufflePayload, false);
+        CompositeInputAttemptIdentifier srcAttemptIdentifier =
+            constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, false);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: ["
               + srcAttemptIdentifier + "]. Not fetching.");
         }
         numDmeEventsNoData.incrementAndGet();
-        shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+        shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0));
         return;
       }
     }
 
-    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme,
+    CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(),
         shufflePayload, (useSharedInputs && srcIndex == 0));
 
-    shuffleManager.addKnownInput(shufflePayload.getHost(),
-        shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
+    shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
+  }
+
+  private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
+    int partitionId = crdme.getSourceIndex();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DME srcIdx: " + partitionId + ", targetIndex: " + crdme.getTargetIndex() + ", count:" + crdme.getCount()
+          + ", attemptNum: " + crdme.getVersion() + ", payload: " + ShuffleUtils
+          .stringify(shufflePayload));
+    }
+
+    if (shufflePayload.hasEmptyPartitions()) {
+      CompositeInputAttemptIdentifier srcAttemptIdentifier =
+          constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(), shufflePayload, false);
+
+      boolean allPartitionsEmpty = true;
+      for (int i = 0; i < crdme.getCount(); i++) {
+        int srcPartitionId = partitionId + i;
+        allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId);
+        if (emptyPartitionsBitSet.get(srcPartitionId)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
+                + srcAttemptIdentifier + "]. Not fetching.");
+          }
+          numDmeEventsNoData.getAndIncrement();
+          shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(i));
+        }
+      }
+
+      if (allPartitionsEmpty) {
+        return;
+      }
+    }
+
+    CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(),
+        shufflePayload, (useSharedInputs && partitionId == 0));
+
+    shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, partitionId);
   }
 
   private void processInputFailedEvent(InputFailedEvent ife) {
@@ -193,26 +237,27 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   /**
    * Helper method to create InputAttemptIdentifier
    *
-   * @param dmEvent
+   * @param targetIndex
+   * @param targetIndexCount
+   * @param version
    * @param shufflePayload
-   * @return InputAttemptIdentifier
+   * @param isShared
+   * @return CompositeInputAttemptIdentifier
    */
-  private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
+  private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version,
       DataMovementEventPayloadProto shufflePayload, boolean isShared) {
     String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null;
-    InputAttemptIdentifier srcAttemptIdentifier = null;
+    CompositeInputAttemptIdentifier srcAttemptIdentifier = null;
     if (shufflePayload.hasSpillId()) {
       int spillEventId = shufflePayload.getSpillId();
       boolean lastEvent = shufflePayload.getLastEvent();
       InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
           .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
-              .getVersion(), pathComponent, isShared, spillInfo, spillEventId);
+          new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, spillInfo, spillEventId, targetIndexCount);
     } else {
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
-              pathComponent, isShared);
+          new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, targetIndexCount);
     }
     return srcAttemptIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 91021a1..3964431 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -48,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -142,6 +143,7 @@ public class ShuffleManager implements FetcherCallback {
   private final boolean localDiskFetchEnabled;
   private final boolean sharedFetchEnabled;
   private final boolean verifyDiskChecksum;
+  private final boolean compositeFetch;
   
   private final int ifileBufferSize;
   private final boolean ifileReadAhead;
@@ -211,6 +213,7 @@ public class ShuffleManager implements FetcherCallback {
     this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
+    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
@@ -419,7 +422,7 @@ public class ShuffleManager implements FetcherCallback {
       httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
-        localhostName, shufflePort, asyncHttp, verifyDiskChecksum);
+        localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
 
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
@@ -456,7 +459,7 @@ public class ShuffleManager implements FetcherCallback {
       if (includedMaps >= maxTaskOutputAtOnce) {
         inputIter.remove();
         //add to inputHost
-        inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(),
+        inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), pendingInputsOfOnePartition.getPartitionCount(),
             input);
       } else {
         includedMaps++;
@@ -467,6 +470,7 @@ public class ShuffleManager implements FetcherCallback {
     }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         pendingInputsOfOnePartition.getPartition(),
+        pendingInputsOfOnePartition.getPartitionCount(),
             pendingInputsOfOnePartition.getInputs());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created Fetcher for host: " + inputHost.getHost()
@@ -479,7 +483,7 @@ public class ShuffleManager implements FetcherCallback {
   /////////////////// Methods for InputEventHandler
   
   public void addKnownInput(String hostName, int port,
-      InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
+                            CompositeInputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
     HostPort identifier = new HostPort(hostName, port);
     InputHost host = knownSrcHosts.get(identifier);
     if (host == null) {
@@ -497,13 +501,14 @@ public class ShuffleManager implements FetcherCallback {
     if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
       return;
     }
-
     int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    if (shuffleInfoEventsMap.get(inputIdentifier) == null) {
-      shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
+    for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) {
+      if (shuffleInfoEventsMap.get(inputIdentifier + i) == null) {
+        shuffleInfoEventsMap.put(inputIdentifier + i, new ShuffleEventInfo(srcAttemptIdentifier.expand(i)));
+      }
     }
 
-    host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier);
+    host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier.getInputIdentifierCount(), srcAttemptIdentifier);
     lock.lock();
     try {
       boolean added = pendingHosts.offer(host);
@@ -1007,7 +1012,7 @@ public class ShuffleManager implements FetcherCallback {
           InputHost inputHost = knownSrcHosts.get(identifier);
           assert inputHost != null;
           for (InputAttemptIdentifier input : pendingInputs) {
-            inputHost.addKnownInput(result.getPartition(), input);
+            inputHost.addKnownInput(result.getPartition(), result.getPartitionCount(), input);
           }
           inputHost.setAdditionalInfo(result.getAdditionalInfo());
           pendingHosts.add(inputHost);


Mime
View raw message