tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-1094. Support pipelined data transfer for Unordered Output (rbalamohan)
Date Thu, 12 Mar 2015 09:08:40 GMT
Repository: tez
Updated Branches:
  refs/heads/master 59529aba8 -> e0e19122f


TEZ-1094. Support pipelined data transfer for Unordered Output (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e0e19122
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e0e19122
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e0e19122

Branch: refs/heads/master
Commit: e0e19122f6b0371eb3bb85e1236dd139778198eb
Parents: 59529ab
Author: Rajesh Balamohan <rbalamohan@hortonworks.com>
Authored: Thu Mar 12 14:37:09 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@hortonworks.com>
Committed: Thu Mar 12 14:37:49 2015 +0530

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    |  24 +-
 .../runtime/library/common/shuffle/Fetcher.java |   6 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  38 ++-
 .../common/shuffle/impl/ShuffleManager.java     | 234 +++++++++++--
 .../orderedgrouped/ShuffleScheduler.java        |   8 +-
 .../common/sort/impl/PipelinedSorter.java       |   5 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   4 +-
 .../writers/UnorderedPartitionedKVWriter.java   | 341 ++++++++++++++-----
 .../output/OrderedPartitionedKVOutput.java      |   8 +-
 .../output/UnorderedPartitionedKVOutput.java    |   1 +
 .../common/sort/impl/TestPipelinedSorter.java   |   3 +-
 .../sort/impl/dflt/TestDefaultSorter.java       |   7 +-
 .../TestUnorderedPartitionedKVWriter.java       | 259 +++++++++++++-
 .../TestUnorderedPartitionedKVOutputConfig.java |   9 +
 .../library/output/TestOnFileSortedOutput.java  |   6 +-
 15 files changed, 777 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index f2ab382..565b47a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -280,25 +280,25 @@ public class TezRuntimeConfiguration {
    */
   public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = false;
 
-  //TODO: Change description when we start supporting pipelined shuffle in unordered cases
   /**
-   * Enable pipelined shuffle in ordered producer/consumer. Expert knob.
-   * Works only with PipelinedSorter. Set tez.runtime.sort.threads > 2 for enabling
-   * PipelinedSorter.  Ensure to set tez.runtime.disable.final-merge.in.sorter=true.
+   * Expert level setting. Enable pipelined shuffle in ordered outputs and in unordered
+   * partitioned outputs. In ordered cases, it works with PipelinedSorter.
+   * set tez.runtime.sort.threads to greater than 1 to enable pipelinedsorter.
+   * Ensure to set tez.runtime.enable.final-merge.in.output=false.
    * Speculative execution needs to be turned off when using this parameter. //TODO: TEZ-2132
    */
   public static final String TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED =
-      TEZ_RUNTIME_PREFIX +
-          "pipelined-shuffle.enabled";
+      TEZ_RUNTIME_PREFIX + "pipelined-shuffle.enabled";
   public static final boolean TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT = false;
 
   /**
-   * final merge in defaultsorter/pipelinedsorter.
-   * speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132
+   * Expert level setting. Enable final merge in ordered (defaultsorter/pipelinedsorter) outputs.
+   * Speculative execution needs to be turned off when disabling this parameter. //TODO: TEZ-2132
    */
-  public static final String TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER =
-      TEZ_RUNTIME_PREFIX + "enable.final-merge.in.sorter";
-  public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT = true;
+  public static final String TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT =
+      TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output";
+  public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true;
+
 
   /**
    * Share data fetched between tasks running on the same host if applicable
@@ -368,7 +368,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
     tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
-    tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER);
+    tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
     tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 30dad46..3661361 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -30,7 +30,6 @@ import java.nio.channels.OverlappingFileLockException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -104,7 +103,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
   private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
-  private LinkedHashSet<InputAttemptIdentifier> remaining;
+  private List<InputAttemptIdentifier> remaining;
 
   private URL url;
   private volatile DataInputStream input;
@@ -181,7 +180,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
               + "- partition is non-zero (%d)", partition);
     }
 
-    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
+    //Similar to TEZ-2172 (remove can be expensive with list)
+    remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
 
     HostFetchResult hostFetchResult;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index ffcd128..c4d6ce3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -32,10 +32,12 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
@@ -112,8 +114,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
           .getEmptyPartitions());
       BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
       if (emptyPartionsBitSet.get(srcIndex)) {
-        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
-            dme.getVersion());
+        InputAttemptIdentifier srcAttemptIdentifier =
+            constructInputAttemptIdentifier(dme, shufflePayload, false);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: ["
               + srcAttemptIdentifier + "]. Not fetching.");
@@ -123,9 +125,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       }
     }
 
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
-        dme.getTargetIndex(), dme.getVersion(),
-        shufflePayload.getPathComponent(), (useSharedInputs && srcIndex == 0));
+    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme,
+        shufflePayload, (useSharedInputs && srcIndex == 0));
 
     if (shufflePayload.hasData()) {
       DataProto dataProto = shufflePayload.getData();
@@ -166,5 +167,32 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
   }
 
+  /**
+   * Helper method to create InputAttemptIdentifier
+   *
+   * @param dmEvent
+   * @param shufflePayload
+   * @return InputAttemptIdentifier
+   */
+  private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
+      DataMovementEventPayloadProto shufflePayload, boolean isShared) {
+    String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null;
+    InputAttemptIdentifier srcAttemptIdentifier = null;
+    if (shufflePayload.hasSpillId()) {
+      int spillEventId = shufflePayload.getSpillId();
+      boolean lastEvent = shufflePayload.getLastEvent();
+      InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
+          .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
+      srcAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent
+              .getVersion(), pathComponent, isShared, spillInfo, spillEventId);
+    } else {
+      srcAttemptIdentifier =
+          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
+              pathComponent, isShared);
+    }
+    return srcAttemptIdentifier;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index fc42e3d..d2e9682 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -23,16 +23,18 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.text.DecimalFormat;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +43,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.crypto.SecretKey;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -115,7 +119,8 @@ public class ShuffleManager implements FetcherCallback {
   private Set<Fetcher> runningFetchers;
   
   private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
-  
+  private final AtomicInteger numFetchedSpills = new AtomicInteger(0);
+
   private final long startTime;
   private long lastProgressTime;
   private long totalBytesShuffledTillNow;
@@ -161,6 +166,10 @@ public class ShuffleManager implements FetcherCallback {
   private final TezCounter firstEventReceived;
   private final TezCounter lastEventReceived;
 
+  //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source.
+  @VisibleForTesting
+  final Map<InputAttemptIdentifier, ShuffleEventInfo> shuffleInfoEventsMap = Maps.newHashMap();
+
   // TODO More counters - FetchErrors, speed?
   
   public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
@@ -194,7 +203,11 @@ public class ShuffleManager implements FetcherCallback {
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
-    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    /**
+     * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
+     * We do not know upfront the number of spills from source.
+     */
+    completedInputs = new LinkedBlockingDeque<FetchedInput>();
     knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
     pendingHosts = new LinkedBlockingQueue<InputHost>();
     obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
@@ -370,6 +383,17 @@ public class ShuffleManager implements FetcherCallback {
     for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
         .iterator(); inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
+
+      //For pipelined shuffle.
+      //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
+      if (input.canRetrieveInputInChunks() && input.getAttemptNumber() > 0) {
+        //speculative attempts or failure attempts. Fail fast here.
+        reportFatalError(new IOException(), input + " already exists. "
+            + "Previous attempt's data could have been already merged "
+            + "to memory/disk outputs.  Failing the fetch early instead of adding to fetcher");
+        continue;
+      }
+
       // Avoid adding attempts which have already completed.
       if (completedInputSet.contains(input.getInputIdentifier())) {
         inputIter.remove();
@@ -406,6 +430,21 @@ public class ShuffleManager implements FetcherCallback {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
     }
+
+    if (srcAttemptIdentifier.canRetrieveInputInChunks()) {
+      //TODO: need to check for speculative tasks later. TEZ-2132
+      if (srcAttemptIdentifier.getAttemptNumber() > 0) {
+        //speculative attempts or failure attempts. Fail fast here.
+        reportFatalError(new IOException(), srcAttemptIdentifier + " already exists. "
+            + "Previous attempt's data could have been already merged "
+            + "to memory/disk outputs.  Failing the fetch early instead of adding to addKnownInput");
+        return;
+      }
+      if (shuffleInfoEventsMap.get(srcAttemptIdentifier) == null) {
+        shuffleInfoEventsMap.put(srcAttemptIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
+      }
+    }
+
     host.addKnownInput(srcAttemptIdentifier);
     lock.lock();
     try {
@@ -429,7 +468,12 @@ public class ShuffleManager implements FetcherCallback {
     if (!completedInputSet.contains(inputIdentifier)) {
       synchronized (completedInputSet) {
         if (!completedInputSet.contains(inputIdentifier)) {
-          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
+          NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
+          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+            registerCompletedInput(fetchedInput);
+          } else {
+            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
+          }
         }
       }
     }
@@ -474,7 +518,11 @@ public class ShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          registerCompletedInput(fetchedInput);
+          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+            registerCompletedInput(fetchedInput);
+          } else {
+            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
+          }
         }
       }
     }
@@ -499,7 +547,47 @@ public class ShuffleManager implements FetcherCallback {
 
   /////////////////// End of Methods for InputEventHandler
   /////////////////// Methods from FetcherCallbackHandler
-  
+
+  /**
+   * Placeholder for tracking shuffle events in case we get multiple spills info for the same
+   * attempt.
+   */
+  static class ShuffleEventInfo {
+    BitSet eventsProcessed;
+    int finalEventId = -1; //0 indexed
+    String id;
+
+
+    ShuffleEventInfo(InputAttemptIdentifier input) {
+      this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
+      this.eventsProcessed = new BitSet();
+    }
+
+    void spillProcessed(int spillId) {
+      if (finalEventId != -1) {
+        Preconditions.checkState(eventsProcessed.cardinality() <= (finalEventId + 1),
+            "Wrong state. eventsProcessed cardinality=" + eventsProcessed.cardinality() + " "
+                + "finalEventId=" + finalEventId + ", spillId=" + spillId + ", " + toString());
+      }
+      eventsProcessed.set(spillId);
+    }
+
+    void setFinalEventId(int spillId) {
+      finalEventId = spillId;
+    }
+
+    boolean isDone() {
+      LOG.info("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" +
+          eventsProcessed.cardinality());
+      return ((finalEventId != -1) && (finalEventId + 1) == eventsProcessed.cardinality());
+    }
+
+    public String toString() {
+      return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId
+          +  ", id=" + id + "]";
+    }
+  }
+
   @Override
   public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
       FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
@@ -537,7 +625,12 @@ public class ShuffleManager implements FetcherCallback {
           }
           decompressedDataSizeCounter.increment(decompressedLength);
 
-          registerCompletedInput(fetchedInput);
+          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+            registerCompletedInput(fetchedInput);
+          } else {
+            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
+          }
+
           lock.lock();
           try {
             totalBytesShuffledTillNow += fetchedBytes;
@@ -562,6 +655,104 @@ public class ShuffleManager implements FetcherCallback {
     // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
   }
 
+  private void registerCompletedInput(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      maybeInformInputReady(fetchedInput);
+      adjustCompletedInputs(fetchedInput);
+      numFetchedSpills.getAndIncrement();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void maybeInformInputReady(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      completedInputs.add(fetchedInput);
+      if (!inputReadyNotificationSent.getAndSet(true)) {
+        // TODO Should eventually be controlled by Inputs which are processing the data.
+        inputContext.inputIsReady();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void adjustCompletedInputs(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+
+      int numComplete = numCompletedInputs.incrementAndGet();
+      if (numComplete == numInputs) {
+        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void registerCompletedInputForPipelinedShuffle(InputAttemptIdentifier
+      srcAttemptIdentifier, FetchedInput fetchedInput) {
+    /**
+     * For pipelinedshuffle it is possible to get multiple spills. Claim success only when
+     * all spills pertaining to an attempt are done.
+     */
+    ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier);
+
+    //TODO: need to check for speculative tasks later. TEZ-2132
+    if (srcAttemptIdentifier.getAttemptNumber() > 0) {
+      //speculative attempts or failure attempts. Fail fast here.
+      reportFatalError(new IOException(), "Previous event already got scheduled for " +
+          srcAttemptIdentifier + ". Previous attempt's data could have been already merged "
+          + "to memory/disk outputs.  Failing the fetch early.");
+      return;
+    }
+
+    //for empty partition case
+    if (eventInfo == null && fetchedInput instanceof NullFetchedInput) {
+      eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
+      shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo);
+    }
+
+    assert(eventInfo != null);
+    eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
+    numFetchedSpills.getAndIncrement();
+
+    if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
+      eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
+    }
+
+    lock.lock();
+    try {
+      /**
+       * When fetch is complete for a spill, add it to completedInputs to ensure that it is
+       * available for downstream processing. Final success will be claimed only when all
+       * spills are downloaded from the source.
+       */
+      maybeInformInputReady(fetchedInput);
+
+
+      //check if we downloaded all spills pertaining to this InputAttemptIdentifier
+      if (eventInfo.isDone()) {
+        adjustCompletedInputs(fetchedInput);
+        shuffleInfoEventsMap.remove(srcAttemptIdentifier);
+      }
+    } finally {
+      lock.unlock();
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("eventInfo " + eventInfo.toString());
+    }
+  }
+
+  private void reportFatalError(Throwable exception, String message) {
+    LOG.fatal(message);
+    inputContext.fatalError(exception, message);
+  }
+
   @Override
   public void fetchFailed(String host,
       InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
@@ -572,9 +763,7 @@ public class ShuffleManager implements FetcherCallback {
         + connectFailed);
     failedShufflesCounter.increment(1);
     if (srcAttemptIdentifier == null) {
-      String message = "Received fetchFailure for an unknown src (null)";
-      LOG.fatal(message);
-      inputContext.fatalError(null, message);
+      reportFatalError(null, "Received fetchFailure for an unknown src (null)");
     } else {
     InputReadErrorEvent readError = InputReadErrorEvent.create(
         "Fetch failure while fetching from "
@@ -584,7 +773,7 @@ public class ShuffleManager implements FetcherCallback {
             srcAttemptIdentifier.getAttemptNumber()),
         srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
         srcAttemptIdentifier.getAttemptNumber());
-    
+
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
     failedEvents.add(readError);
     inputContext.sendEvents(failedEvents);
@@ -616,24 +805,6 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
 
-  private void registerCompletedInput(FetchedInput fetchedInput) {
-    lock.lock();
-    try {
-      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
-      completedInputs.add(fetchedInput);
-      if (!inputReadyNotificationSent.getAndSet(true)) {
-        // TODO Should eventually be controlled by Inputs which are processing the data.
-        inputContext.inputIsReady();
-      }
-      int numComplete = numCompletedInputs.incrementAndGet();
-      if (numComplete == numInputs) {
-        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-  
   /////////////////// Methods for walking the available inputs
   
   /**
@@ -723,11 +894,12 @@ public class ShuffleManager implements FetcherCallback {
 
   private void logProgress() {
     double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
-    int inputsDone = numInputs - numCompletedInputs.get();
+    int inputsDone = numCompletedInputs.get();
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
     double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " of " + numInputs +
+    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " +
+        numInputs +
         ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index f09a20c..d3ee161 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -71,6 +71,7 @@ class ShuffleScheduler {
   private boolean[] finishedMaps;
   private final int numInputs;
   private int remainingMaps;
+  private int numFetchedSpills;
   private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
   //TODO Clean this and other maps at some point
   private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
@@ -200,7 +201,8 @@ class ShuffleScheduler {
     void spillProcessed(int spillId) {
       if (finalEventId != -1) {
         Preconditions.checkState(eventsProcessed.cardinality() <= (finalEventId + 1),
-            "Wrong state " + toString());
+            "Wrong state. eventsProcessed cardinality=" + eventsProcessed.cardinality() + " "
+                + "finalEventId=" + finalEventId + ", spillId=" + spillId + ", " + toString());
       }
       eventsProcessed.set(spillId);
     }
@@ -260,6 +262,7 @@ class ShuffleScheduler {
       if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
         remainingMaps = remainingMaps - 1;
         setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
+        numFetchedSpills++;
       } else {
         ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier);
 
@@ -280,6 +283,7 @@ class ShuffleScheduler {
 
         assert(eventInfo != null);
         eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
+        numFetchedSpills++;
 
         if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
           eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
@@ -338,7 +342,7 @@ class ShuffleScheduler {
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
     double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " of " + numInputs +
+    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills +  ") of " + numInputs +
         ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index d36053c..34a7e3b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -29,7 +29,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -128,8 +127,8 @@ public class PipelinedSorter extends ExternalSorter {
     partitionBits = bitcount(partitions)+1;
 
     finalMergeEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER,
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT);
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
 
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f44e176..a56249d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -143,8 +143,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                                        TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);
 
     finalMergeEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER,
-        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT);
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
 
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index be128a9..8b5f196 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -28,7 +28,6 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -37,8 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -135,6 +136,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private final ReentrantLock spillLock = new ReentrantLock();
   private final Condition spillInProgress = spillLock.newCondition();
 
+  private final boolean pipelinedShuffle;
+
+  private final long indexFileSizeEstimate;
+
   public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
       int numOutputs, long availableMemoryBytes) throws IOException {
     super(outputContext, conf, numOutputs);
@@ -173,6 +178,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     outputLargeRecordsCounter = outputContext.getCounters().findCounter(
         TaskCounter.OUTPUT_LARGE_RECORDS);
+
+    //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
+    // this case.  Add it later if needed.
+    pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
+
+
+    indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    LOG.info("pipelinedShuffle=" + pipelinedShuffle);
   }
 
   private void computeNumBuffersAndSize(int bufferLimit) {
@@ -219,7 +234,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
         // Key too large for any buffer. Write entire record to disk.
         currentBuffer.reset();
-        writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+        writeLargeRecord(key, value, partition);
         return;
       } else { // Exceeded length on current buffer.
         // Try resetting the buffer to the next one, if this was not the start of a buffer,
@@ -237,7 +252,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       if (metaStart == 0) {
         // Key + Value too large for a single buffer.
         currentBuffer.reset();
-        writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+        writeLargeRecord(key, value, partition);
         return;
       } else { // Exceeded length on current buffer.
         // Try writing key+value to a new buffer - will fall back to disk if that fails.
@@ -276,9 +291,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
       pendingSpillCount.incrementAndGet();
 
-      ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(currentBuffer,
-          numSpills.incrementAndGet(), codec, spilledRecordsCounter, false));
-      Futures.addCallback(future, new SpillCallback(numSpills.get()));
+      SpillPathDetails spillPathDetails = getSpillPathDetails(false, -1);
+
+      ListenableFuture<SpillResult> future = spillExecutor.submit(
+          new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillPathDetails));
+      Futures.addCallback(future, new SpillCallback(spillPathDetails.spillIndex));
 
       WrappedBuffer wb = getNextAvailableBuffer();
       currentBuffer = wb;
@@ -317,16 +334,18 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     private final WrappedBuffer wrappedBuffer;
     private final CompressionCodec codec;
     private final TezCounter numRecordsCounter;
-    private final int spillNumber;
-    private final boolean isFinalSpill;
+    private final int spillIndex;
+    private final SpillPathDetails spillPathDetails;
 
-    public SpillCallable(WrappedBuffer wrappedBuffer, int spillNumber, CompressionCodec codec,
-        TezCounter numRecordsCounter, boolean isFinal) {
+    public SpillCallable(WrappedBuffer wrappedBuffer, CompressionCodec codec,
+        TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) {
       this.wrappedBuffer = wrappedBuffer;
       this.codec = codec;
       this.numRecordsCounter = numRecordsCounter;
-      this.spillNumber = spillNumber;
-      this.isFinalSpill = isFinal;
+      this.spillIndex = spillPathDetails.spillIndex;
+      Preconditions.checkArgument(spillPathDetails.outputFilePath != null, "Spill output file "
+          + "path can not be null");
+      this.spillPathDetails = spillPathDetails;
     }
 
     @Override
@@ -336,18 +355,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       // Number of parallel spills determined by number of threads.
       // Last spill synchronization handled separately.
       SpillResult spillResult = null;
-      long spillSize = wrappedBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH;
-      Path outPath = null;
-      if (isFinalSpill) {
-        outPath = outputFileHandler.getOutputFileForWrite(spillSize);
-        finalOutPath = outPath;
-      } else {
-        outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
-      }
-      FSDataOutputStream out = rfs.create(outPath);
+      FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath);
       TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       DataInputBuffer key = new DataInputBuffer();
       DataInputBuffer val = new DataInputBuffer();
+      long compressedLength = 0;
       for (int i = 0; i < numPartitions; i++) {
         IFile.Writer writer = null;
         try {
@@ -359,12 +371,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null);
           writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val);
           writer.close();
-          if (isFinalSpill) {
-            fileOutputBytesCounter.increment(writer.getCompressedLength());
-          } else {
-            additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
-          }
-          spillResult = new SpillResult(writer.getCompressedLength(), this.wrappedBuffer);
+          compressedLength += writer.getCompressedLength();
           TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
               writer.getCompressedLength());
           spillRecord.putIndex(indexRecord, i);
@@ -375,17 +382,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           }
         }
       }
-      if (isFinalSpill) {
-        long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-        finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
-        spillRecord.writeToFile(finalIndexPath, conf);
-        fileOutputBytesCounter.increment(indexFileSizeEstimate);
-        LOG.info("Finished final and only spill");
-      } else {
-        SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
-        spillInfoList.add(spillInfo);
-        numAdditionalSpillsCounter.increment(1);
-        LOG.info("Finished spill " + spillNumber);
+      spillResult = new SpillResult(compressedLength, this.wrappedBuffer);
+
+      handleSpillIndex(spillPathDetails, spillRecord);
+      LOG.info("Finished spill " + spillIndex);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Spill=" + spillIndex + ", indexPath="
+            + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath);
       }
       return spillResult;
     }
@@ -406,7 +410,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   }
 
   public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
-    int initialMemRequestMb = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB,
+    int initialMemRequestMb = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB,
         TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT);
     Preconditions.checkArgument(initialMemRequestMb != 0,
         TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0");
@@ -443,80 +448,160 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       LOG.info("All spills complete");
       // Assuming close will be called on the same thread as the write
       cleanup();
-      if (numSpills.get() > 0) {
-        mergeAll();
-      } else {
-        finalSpill();
+
+      List<Event> events = Lists.newLinkedList();
+      if (!pipelinedShuffle) {
+        //Regular code path.
+        if (numSpills.get() > 0) {
+          mergeAll();
+        } else {
+          finalSpill();
+        }
+        cleanupCurrentBuffer();
+        return Collections.singletonList(generateDMEvent());
       }
 
-      currentBuffer.cleanup();
-      currentBuffer = null;
+      //For pipelined case, send out an event in case finalspill generated a spill file.
+      if (finalSpill()) {
+        sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, numSpills.get() - 1, true);
+      }
+      cleanupCurrentBuffer();
+      return events;
     }
-
-    return Collections.singletonList(generateEvent());
   }
 
-  private void cleanup() {
-    if (spillExecutor != null) {
-      spillExecutor.shutdownNow();
-    }
-    for (int i = 0; i < buffers.length; i++) {
-      if (buffers[i] != null && buffers[i] != currentBuffer) {
-        buffers[i].cleanup();
-        buffers[i] = null;
+  private BitSet getEmptyPartitions(int[] recordsPerPartition) {
+    Preconditions.checkArgument(recordsPerPartition != null, "records per partition can not be null");
+    BitSet emptyPartitions = new BitSet();
+    for (int i = 0; i < numPartitions; i++) {
+      if (recordsPerPartition[i] == 0 ) {
+        emptyPartitions.set(i);
       }
     }
-    availableBuffers.clear();
+    return emptyPartitions;
+  }
+
+  private Event generateDMEvent() throws IOException {
+    BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition);
+    return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions);
   }
 
-  private Event generateEvent() throws IOException {
-    DataMovementEventPayloadProto.Builder payloadBuidler = DataMovementEventPayloadProto
+  private Event generateDMEvent(boolean addSpillDetails, int spillId,
+      boolean isLastSpill, String pathComponent, BitSet emptyPartitions)
+      throws IOException {
+
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
 
     String host = getHost();
-    int shufflePort = getShufflePort();
-
-    BitSet emptyPartitions = new BitSet();
-    for (int i = 0; i < numPartitions; i++) {
-      if (numRecordsPerPartition[i] == 0) {
-        emptyPartitions.set(i);
-      }
-    }
     if (emptyPartitions.cardinality() != 0) {
       // Empty partitions exist
-      ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString(
-          TezUtilsInternal
-          .toByteArray(emptyPartitions));
-      payloadBuidler.setEmptyPartitions(emptyPartitionsByteString);
+      ByteString emptyPartitionsByteString =
+          TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions));
+      payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
     }
+
     if (emptyPartitions.cardinality() != numPartitions) {
       // Populate payload only if at least 1 partition has data
-      payloadBuidler.setHost(host);
-      payloadBuidler.setPort(shufflePort);
-      payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier());
+      payloadBuilder.setHost(host);
+      payloadBuilder.setPort(getShufflePort());
+      payloadBuilder.setPathComponent(pathComponent);
     }
 
-    CompositeDataMovementEvent cDme = CompositeDataMovementEvent.create(0, numPartitions,
-        payloadBuidler.build().toByteString().asReadOnlyByteBuffer());
-    return cDme;
+    if (addSpillDetails) {
+      payloadBuilder.setSpillId(spillId);
+      payloadBuilder.setLastEvent(isLastSpill);
+    }
+
+    ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
+    return CompositeDataMovementEvent.create(0, numPartitions, payload);
+  }
+
+  private void cleanupCurrentBuffer() {
+    currentBuffer.cleanup();
+    currentBuffer = null;
+  }
+
+  private void cleanup() {
+    if (spillExecutor != null) {
+      spillExecutor.shutdownNow();
+    }
+    for (int i = 0; i < buffers.length; i++) {
+      if (buffers[i] != null && buffers[i] != currentBuffer) {
+        buffers[i].cleanup();
+        buffers[i] = null;
+      }
+    }
+    availableBuffers.clear();
   }
 
-  private void finalSpill() throws IOException {
+  private boolean finalSpill() throws IOException {
     if (currentBuffer.nextPosition == 0) {
-      return;
+      if (pipelinedShuffle) {
+        //Send final event with all empty partitions and null path component.
+        BitSet emptyPartitions = new BitSet(numPartitions);
+        emptyPartitions.flip(0, numPartitions);
+
+        outputContext.sendEvents(
+            Collections.singletonList(generateDMEvent(true, numSpills.get(), true, null, emptyPartitions)));
+      }
+      return false;
     } else {
       updateGlobalStats(currentBuffer);
-      SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true);
+
+      //setup output file and index file
+      SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1);
+      SpillCallable spillCallable = new SpillCallable(currentBuffer, codec, null, spillPathDetails);
       try {
-        spillCallable.call();
+        SpillResult spillResult = spillCallable.call();
+
+        fileOutputBytesCounter.increment(spillResult.spillSize);
+        fileOutputBytesCounter.increment(indexFileSizeEstimate);
       } catch (Exception ex) {
         throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
       }
-      return;
+      return true;
     }
 
   }
 
+  /**
+   * Set up spill output file, index file details.
+   *
+   * @param isFinalSpill
+   * @param expectedSpillSize
+   * @return SpillPathDetails
+   * @throws IOException
+   */
+  private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) throws
+      IOException {
+    int spillNumber = numSpills.getAndIncrement();
+    long spillSize = (expectedSpillSize < 0) ?
+        (currentBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH) : expectedSpillSize;
+
+    Path outputFilePath = null;
+    Path indexFilePath = null;
+
+    if (!pipelinedShuffle) {
+      if (isFinalSpill) {
+        outputFilePath = outputFileHandler.getOutputFileForWrite(spillSize);
+        indexFilePath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+
+        //Setting this for tests
+        finalOutPath = outputFilePath;
+        finalIndexPath = indexFilePath;
+      } else {
+        outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
+      }
+    } else {
+      outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
+      indexFilePath  = outputFileHandler.getSpillIndexFileForWrite(spillNumber, indexFileSizeEstimate);
+    }
+
+    SpillPathDetails spillDetails = new SpillPathDetails(outputFilePath, indexFilePath, spillNumber);
+    return spillDetails;
+  }
+
   private void mergeAll() throws IOException {
     long expectedSize = spilledSize;
     if (currentBuffer.nextPosition != 0) {
@@ -526,9 +611,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       updateGlobalStats(currentBuffer);
     }
 
-    long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-    finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
-    finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+    SpillPathDetails spillPathDetails = getSpillPathDetails(true, expectedSize);
+    finalIndexPath = spillPathDetails.indexFilePath;
+    finalOutPath = spillPathDetails.outputFilePath;
 
     TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
 
@@ -601,17 +686,23 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
   }
 
-  private void writeLargeRecord(final Object key, final Object value, final int partition,
-      final int spillNumber) throws IOException {
+  private void writeLargeRecord(final Object key, final Object value, final int partition)
+      throws IOException {
     numAdditionalSpillsCounter.increment(1);
     long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize
         + numPartitions * APPROX_HEADER_LENGTH;
+    SpillPathDetails spillPathDetails = getSpillPathDetails(false, size);
+    int spillIndex = spillPathDetails.spillIndex;
     FSDataOutputStream out = null;
     long outSize = 0;
     try {
       final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
-      final Path outPath = outputFileHandler.getSpillFileForWrite(spillNumber, size);
+      final Path outPath = spillPathDetails.outputFilePath;
       out = rfs.create(outPath);
+      BitSet emptyPartitions = null;
+      if (pipelinedShuffle) {
+        emptyPartitions = new BitSet(numPartitions);
+      }
       for (int i = 0; i < numPartitions; i++) {
         final long recordStart = out.getPos();
         if (i == partition) {
@@ -634,11 +725,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
               writer.close();
             }
           }
+        } else {
+          if (emptyPartitions != null) {
+            emptyPartitions.set(i);
+          }
         }
       }
-      SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
-      spillInfoList.add(spillInfo);
-      LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillNumber);
+      handleSpillIndex(spillPathDetails, spillRecord);
+
+      sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
+
+      LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillIndex);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LargeRecord Spill=" + spillIndex + ", indexPath="
+            + spillPathDetails.indexFilePath + ", outputPath="
+            + spillPathDetails.outputFilePath);
+      }
     } finally {
       if (out != null) {
         out.close();
@@ -646,6 +748,19 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
+  private void handleSpillIndex(SpillPathDetails spillPathDetails, TezSpillRecord spillRecord)
+      throws IOException {
+    if (spillPathDetails.indexFilePath != null) {
+      //write the index record
+      spillRecord.writeToFile(spillPathDetails.indexFilePath, conf);
+    } else {
+      //add to cache
+      SpillInfo spillInfo = new SpillInfo(spillRecord, spillPathDetails.outputFilePath);
+      spillInfoList.add(spillInfo);
+      numAdditionalSpillsCounter.increment(1);
+    }
+  }
+
   private class ByteArrayOutputStream extends OutputStream {
 
     private final byte[] scratch = new byte[1];
@@ -721,6 +836,29 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     private static final long serialVersionUID = 1L;
   }
 
+  private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) {
+    if (!pipelinedShuffle) {
+      return;
+    }
+    //Send out an event for consuming.
+    try {
+      String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
+      Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
+          pathComponent, emptyPartitions);
+
+      LOG.info("Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
+      outputContext.sendEvents(Collections.singletonList(compEvent));
+    } catch (IOException e) {
+      LOG.fatal("Error in sending pipelined events", e);
+      outputContext.fatalError(e, "Error in sending pipelined events");
+    }
+  }
+
+  private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean isFinalUpdate) {
+    BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition);
+    sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate);
+  }
+
   private class SpillCallback implements FutureCallback<SpillResult> {
 
     private final int spillNumber;
@@ -733,6 +871,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     public void onSuccess(SpillResult result) {
       LOG.info("Spill# " + spillNumber + " complete.");
       spilledSize += result.spillSize;
+
+      sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false);
+
       try {
         result.wrappedBuffer.reset();
         availableBuffers.add(result.wrappedBuffer);
@@ -742,6 +883,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         outputContext.fatalError(e, "Failure while attempting to reset buffer after spill");
       }
 
+      if (!pipelinedShuffle) {
+        additionalSpillBytesWritternCounter.increment(result.spillSize);
+      } else {
+        fileOutputBytesCounter.increment(indexFileSizeEstimate);
+        fileOutputBytesCounter.increment(result.spillSize);
+      }
+
       spillLock.lock();
       try {
         if (pendingSpillCount.decrementAndGet() == 0) {
@@ -800,4 +948,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
     return shufflePort;
   }
+
+  @InterfaceAudience.Private
+  static class SpillPathDetails {
+    final Path indexFilePath;
+    final Path outputFilePath;
+    final int spillIndex;
+
+    SpillPathDetails(Path outputFilePath, Path indexFilePath, int spillIndex) {
+      this.outputFilePath = outputFilePath;
+      this.indexFilePath = indexFilePath;
+      this.spillIndex = spillIndex;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 0e859ba..90147ca 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -104,8 +104,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
           TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT);
 
       finalMergeEnabled = conf.getBoolean(
-          TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER,
-          TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER_DEFAULT);
+          TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
 
       pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
           .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
@@ -121,7 +121,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
       if (pipelinedShuffle) {
         Preconditions.checkArgument(!finalMergeEnabled, TezRuntimeConfiguration
-            .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER + " has to be set to false for pipelined "
+            .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT + " has to be set to false for pipelined "
             + "shuffle to work properly.");
 
         //TODO: Enable it for pipelinedsorter only and not for DefaultSorter
@@ -203,7 +203,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
-    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 1e39535..9053507 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -120,6 +120,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index d0187bd..073d956 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -33,7 +33,6 @@ import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -137,7 +136,7 @@ public class TestPipelinedSorter {
     this.numOutputs = 1;
     this.initialAvailableMem = 5 *1024 * 1024;
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
         initialAvailableMem, 1<<20);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 875f23a..c9c215d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -49,7 +49,6 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
@@ -180,7 +179,7 @@ public class TestDefaultSorter {
   public void testWithEmptyDataWithFinalMergeDisabled() throws IOException {
     OutputContext context = createTezOutputContext();
 
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
     context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
@@ -204,7 +203,7 @@ public class TestDefaultSorter {
   public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
     OutputContext context = createTezOutputContext();
 
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
     context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
@@ -233,7 +232,7 @@ public class TestDefaultSorter {
   public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
     OutputContext context = createTezOutputContext();
 
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
     MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index cb385ea..995eee6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -22,10 +22,15 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
@@ -85,6 +90,9 @@ import org.junit.runners.Parameterized.Parameters;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @RunWith(value = Parameterized.class)
 public class TestUnorderedPartitionedKVWriter {
@@ -127,7 +135,7 @@ public class TestUnorderedPartitionedKVWriter {
 
   @Test(timeout = 10000)
   public void testBufferSizing() throws IOException {
-    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
@@ -190,34 +198,34 @@ public class TestUnorderedPartitionedKVWriter {
 
   @Test(timeout = 10000)
   public void testRandomText() throws IOException, InterruptedException {
-    textTest(100, 10, 2048, 0, 0, 0);
+    textTest(100, 10, 2048, 0, 0, 0, false);
   }
 
   @Test(timeout = 10000)
   public void testLargeKeys() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 10, 0, 0);
+    textTest(0, 10, 2048, 10, 0, 0, false);
   }
 
   @Test(timeout = 10000)
   public void testLargevalues() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 0, 10, 0);
+    textTest(0, 10, 2048, 0, 10, 0, false);
   }
 
   @Test(timeout = 10000)
   public void testLargeKvPairs() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 0, 0, 10);
+    textTest(0, 10, 2048, 0, 0, 10, false);
   }
 
   @Test(timeout = 10000)
   public void testTextMixedRecords() throws IOException, InterruptedException {
-    textTest(100, 10, 2048, 10, 10, 10);
+    textTest(100, 10, 2048, 10, 10, 10, false);
   }
 
   public void textTest(int numRegularRecords, int numPartitions, long availableMemory,
-      int numLargeKeys, int numLargevalues, int numLargeKvPairs) throws IOException,
+      int numLargeKeys, int numLargevalues, int numLargeKvPairs, boolean pipeliningEnabled) throws IOException,
       InterruptedException {
     Partitioner partitioner = new HashPartitioner();
-    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
@@ -225,6 +233,10 @@ public class TestUnorderedPartitionedKVWriter {
 
     Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
         -1, HashPartitioner.class);
+    if (pipeliningEnabled) {
+      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    }
     CompressionCodec codec = null;
     if (shouldCompress) {
       codec = new DefaultCodec();
@@ -270,6 +282,9 @@ public class TestUnorderedPartitionedKVWriter {
       kvWriter.write(keyText, valText);
       numRecordsWritten++;
     }
+    if (pipeliningEnabled) {
+      verify(outputContext, times(numLargeKeys)).sendEvents(anyListOf(Event.class));
+    }
 
     // Write Large val records
     for (int i = 0; i < numLargevalues; i++) {
@@ -283,6 +298,9 @@ public class TestUnorderedPartitionedKVWriter {
       kvWriter.write(keyText, valText);
       numRecordsWritten++;
     }
+    if (pipeliningEnabled) {
+      verify(outputContext, times(numLargevalues + numLargeKeys)).sendEvents(anyListOf(Event.class));
+    }
 
     // Write records where key + val are large (but both can fit in the buffer individually)
     for (int i = 0; i < numLargeKvPairs; i++) {
@@ -296,6 +314,10 @@ public class TestUnorderedPartitionedKVWriter {
       kvWriter.write(keyText, valText);
       numRecordsWritten++;
     }
+    if (pipeliningEnabled) {
+      verify(outputContext, times(numLargevalues + numLargeKeys + numLargeKvPairs))
+          .sendEvents(anyListOf(Event.class));
+    }
 
     List<Event> events = kvWriter.close();
     verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
@@ -304,6 +326,10 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
         outputLargeRecordsCounter.getValue());
 
+    if (pipeliningEnabled) {
+      return;
+    }
+
     // Validate the event
     assertEquals(1, events.size());
     assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -373,10 +399,207 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(0, expectedValues.size());
   }
 
+  @Test(timeout = 10000)
+  public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
+    baseTestWithPipelinedTransfer(10, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSingleSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
+    baseTestWithPipelinedTransfer(50, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testMultipleSpills_WithPipelinedShuffle() throws IOException, InterruptedException {
+    baseTestWithPipelinedTransfer(200, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoRecords_WithPipelinedShuffle() throws IOException, InterruptedException {
+    baseTestWithPipelinedTransfer(0, 10, null, shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException {
+    baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(2, 5), shouldCompress);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException {
+    textTest(0, 10, 2048, 10, 20, 50, true);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  private void baseTestWithPipelinedTransfer(int numRecords, int numPartitions, Set<Integer>
+      skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
+
+    PartitionerForTest partitioner = new PartitionerForTest();
+    ApplicationId appId = ApplicationId.newInstance(10000000, 1);
+    TezCounters counters = new TezCounters();
+    String uniqueId = UUID.randomUUID().toString();
+    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+    Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
+        shouldCompress, -1);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
+    conf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+
+    CompressionCodec codec = null;
+    if (shouldCompress) {
+      codec = new DefaultCodec();
+      ((Configurable) codec).setConf(conf);
+    }
+
+    int numOutputs = numPartitions;
+    long availableMemory = 2048;
+    int numRecordsWritten = 0;
+
+    UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
+        conf, numOutputs, availableMemory);
+
+    int sizePerBuffer = kvWriter.sizePerBuffer;
+    int sizePerRecord = 4 + 8; // IntW + LongW
+    int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
+
+    IntWritable intWritable = new IntWritable();
+    LongWritable longWritable = new LongWritable();
+    for (int i = 0; i < numRecords; i++) {
+      intWritable.set(i);
+      longWritable.set(i);
+      int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
+      if (skippedPartitions != null && skippedPartitions.contains(partition)) {
+        continue;
+      }
+      kvWriter.write(intWritable, longWritable);
+      numRecordsWritten++;
+    }
+
+    int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
+    int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
+
+    ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
+    List<Event> events = kvWriter.close();
+    assertTrue(events.size() == 0); //no events are sent to kvWriter upon close with pipelining
+
+    verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture());
+    events = eventCaptor.getValue();
+
+    assertTrue(events.size() == 1); //the last event which was sent out
+
+    verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
+
+    // Verify the status of the buffers
+    if (numExpectedSpills == 0) {
+      assertEquals(1, kvWriter.numInitializedBuffers);
+    } else {
+      assertTrue(kvWriter.numInitializedBuffers > 1);
+    }
+    assertNull(kvWriter.currentBuffer);
+    assertEquals(0, kvWriter.availableBuffers.size());
+
+    // Verify the counters
+    TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES);
+    TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS);
+    TezCounter outputBytesWithOverheadCounter = counters
+        .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+    TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter additionalSpillBytesWritternCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    TezCounter additionalSpillBytesReadCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    TezCounter numAdditionalSpillsCounter = counters
+        .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+    assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
+    assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
+    assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
+        outputBytesWithOverheadCounter.getValue());
+    long fileOutputBytes = fileOutputBytesCounter.getValue();
+    if (numRecordsWritten > 0) {
+      assertTrue(fileOutputBytes > 0);
+      if (!shouldCompress) {
+        assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
+      }
+    } else {
+      assertEquals(0, fileOutputBytes);
+    }
+    assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue());
+    long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
+    long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
+
+    //No additional spill bytes written when final merge is disabled.
+    assertEquals(additionalSpillBytesWritten, 0);
+
+    //No additional spills when final merge is disabled.
+    assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
+
+    //No additional spills when final merge is disabled.
+    assertEquals(numAdditionalSpillsCounter.getValue(), 0);
+
+    BitSet emptyPartitionBits = null;
+    assertTrue(events.size() > 0);
+    //Get the last event
+    int index = events.size() - 1;
+    assertTrue(events.get(index) instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(index);
+    assertEquals(0, cdme.getSourceIndexStart());
+    assertEquals(numOutputs, cdme.getCount());
+    DataMovementEventPayloadProto eventProto =
+        DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
+    assertFalse(eventProto.hasData());
+    //Ensure that this is the last event
+    assertTrue(eventProto.getLastEvent());
+    if (eventProto.hasEmptyPartitions()) {
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
+          .getEmptyPartitions());
+      emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
+      if (numRecordsWritten == 0) {
+        assertEquals(numPartitions, emptyPartitionBits.cardinality());
+      } else {
+        if (skippedPartitions != null) {
+          for (Integer e : skippedPartitions) {
+            assertTrue(emptyPartitionBits.get(e));
+          }
+          assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality());
+        }
+      }
+      if (emptyPartitionBits.cardinality() != numPartitions) {
+        assertEquals(HOST_STRING, eventProto.getHost());
+        assertEquals(SHUFFLE_PORT, eventProto.getPort());
+        assertTrue(eventProto.hasPathComponent());
+      } else {
+        assertFalse(eventProto.hasHost());
+        assertFalse(eventProto.hasPort());
+        assertFalse(eventProto.hasPathComponent());
+      }
+    } else {
+      assertEquals(HOST_STRING, eventProto.getHost());
+      assertEquals(SHUFFLE_PORT, eventProto.getPort());
+      assertTrue(eventProto.hasPathComponent());
+    }
+
+    // Verify if all spill files are available.
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+
+    if (numRecordsWritten > 0) {
+      int numSpills = kvWriter.numSpills.get();
+      for (int i = 0; i < numSpills; i++) {
+        assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10)));
+        assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10)));
+      }
+    } else {
+      return;
+    }
+  }
+
+
   private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
       boolean shouldCompress) throws IOException, InterruptedException {
     PartitionerForTest partitioner = new PartitionerForTest();
-    ApplicationId appId = ApplicationId.newInstance(10000, 1);
+    ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
@@ -582,12 +805,18 @@ public class TestUnorderedPartitionedKVWriter {
     doReturn(1).when(outputContext).getTaskVertexIndex();
     doReturn("vertexName").when(outputContext).getTaskVertexName();
     doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
-    ByteBuffer portBuffer = ByteBuffer.allocate(4);
-    portBuffer.mark();
-    portBuffer.putInt(SHUFFLE_PORT);
-    portBuffer.reset();
-    doReturn(portBuffer).when(outputContext).getServiceProviderMetaData(
-        eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+
+    doAnswer(new Answer<ByteBuffer>() {
+      @Override
+      public ByteBuffer answer(InvocationOnMock invocation) throws Throwable {
+        ByteBuffer portBuffer = ByteBuffer.allocate(4);
+        portBuffer.mark();
+        portBuffer.putInt(SHUFFLE_PORT);
+        portBuffer.reset();
+        return portBuffer;
+      }
+    }).when(outputContext).getServiceProviderMetaData(eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+
     Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
     String[] outDirs = new String[] { outDirBase.toString() };
     doReturn(outDirs).when(outputContext).getWorkDirs();

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
index 83ea707..f509acb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -80,6 +80,10 @@ public class TestUnorderedPartitionedKVOutputConfig {
             .setAvailableBufferSize(1111)
             .setAdditionalConfiguration("fs.shouldExist", "fs")
             .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezRuntimeConfiguration
+                .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
+            .setAdditionalConfiguration(TezRuntimeConfiguration
+                .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "true")
             .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
                 String.valueOf(false))
             .setAdditionalConfiguration(additionalConf)
@@ -94,6 +98,11 @@ public class TestUnorderedPartitionedKVOutputConfig {
     Configuration conf = rebuilt.conf;
 
     // Verify programmatic API usage
+    assertEquals(true, conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false));
+    //unorderedpartitioned writer ignores this value.
+    assertEquals(false, conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false));
     assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
     assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
     assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));

http://git-wip-us.apache.org/repos/asf/tez/blob/e0e19122/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index e0b75b8..721673b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -166,7 +166,7 @@ public class TestOnFileSortedOutput {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
 
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
     OutputContext context = createTezOutputContext();
     UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
@@ -186,7 +186,7 @@ public class TestOnFileSortedOutput {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
 
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, true);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
     OutputContext context = createTezOutputContext();
     UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
@@ -208,7 +208,7 @@ public class TestOnFileSortedOutput {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
     //negative. with sort threads-1
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1);
-    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
 
     OutputContext context = createTezOutputContext();


Mime
View raw message