tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [22/52] [abbrv] tez git commit: TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt (rbalamohan)
Date Fri, 27 Mar 2015 00:57:40 GMT
TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2703f0b0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2703f0b0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2703f0b0

Branch: refs/heads/TEZ-2003
Commit: 2703f0b089d8435dd3f50c115a46a74275b67d80
Parents: a44f5c5
Author: Rajesh Balamohan <rbalamohan@hortonworks.com>
Authored: Mon Mar 23 05:07:06 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@hortonworks.com>
Committed: Mon Mar 23 05:07:06 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/shuffle/impl/ShuffleManager.java     |  69 ++++---
 .../orderedgrouped/ShuffleScheduler.java        |  68 ++++---
 .../impl/TestShuffleInputEventHandlerImpl.java  | 185 +++++++++++++++++++
 ...tShuffleInputEventHandlerOrderedGrouped.java |  60 ++++--
 5 files changed, 316 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a33fe2b..9e11008 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt
   TEZ-2210. Record DAG AM CPU usage stats
   TEZ-2203. Intern strings in tez counters
   TEZ-2202. Fix LocalTaskExecutionThread ID to the standard thread numbering.

http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d2e9682..3995995 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -25,6 +25,7 @@ import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -168,7 +169,7 @@ public class ShuffleManager implements FetcherCallback {
 
   //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled
in source.
   @VisibleForTesting
-  final Map<InputAttemptIdentifier, ShuffleEventInfo> shuffleInfoEventsMap = Maps.newHashMap();
+  final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
 
   // TODO More counters - FetchErrors, speed?
   
@@ -251,6 +252,8 @@ public class ShuffleManager implements FetcherCallback {
 
     Arrays.sort(this.localDisks);
 
+    shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>();
+
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
         + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
@@ -354,6 +357,24 @@ public class ShuffleManager implements FetcherCallback {
       return null;
     }
   }
+
+  private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) {
+    //For pipelined shuffle.
+    //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
+    if (input.canRetrieveInputInChunks()) {
+      ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+      if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum)
{
+        //speculative attempts or failure attempts. Fail fast here.
+        reportFatalError(new IOException(), input + " already exists. "
+            + "Previous attempt's data could have been already merged "
+            + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum=" + eventInfo
+            .attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum="
+
+            input.getAttemptNumber());
+        return false;
+      }
+    }
+    return true;
+  }
   
   private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
 
@@ -385,12 +406,7 @@ public class ShuffleManager implements FetcherCallback {
       InputAttemptIdentifier input = inputIter.next();
 
       //For pipelined shuffle.
-      //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
-      if (input.canRetrieveInputInChunks() && input.getAttemptNumber() > 0) {
-        //speculative attempts or failure attempts. Fail fast here.
-        reportFatalError(new IOException(), input + " already exists. "
-            + "Previous attempt's data could have been already merged "
-            + "to memory/disk outputs.  Failing the fetch early instead of adding to fetcher");
+      if (!validateInputAttemptForPipelinedShuffle(input)) {
         continue;
       }
 
@@ -431,18 +447,13 @@ public class ShuffleManager implements FetcherCallback {
       LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
     }
 
-    if (srcAttemptIdentifier.canRetrieveInputInChunks()) {
-      //TODO: need to check for speculative tasks later. TEZ-2132
-      if (srcAttemptIdentifier.getAttemptNumber() > 0) {
-        //speculative attempts or failure attempts. Fail fast here.
-        reportFatalError(new IOException(), srcAttemptIdentifier + " already exists. "
-            + "Previous attempt's data could have been already merged "
-            + "to memory/disk outputs.  Failing the fetch early instead of adding to addKnownInput");
-        return;
-      }
-      if (shuffleInfoEventsMap.get(srcAttemptIdentifier) == null) {
-        shuffleInfoEventsMap.put(srcAttemptIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
-      }
+    if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
+      return;
+    }
+
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    if (shuffleInfoEventsMap.get(inputIdentifier) == null) {
+      shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
     }
 
     host.addKnownInput(srcAttemptIdentifier);
@@ -555,12 +566,14 @@ public class ShuffleManager implements FetcherCallback {
   static class ShuffleEventInfo {
     BitSet eventsProcessed;
     int finalEventId = -1; //0 indexed
+    int attemptNum;
     String id;
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
       this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
       this.eventsProcessed = new BitSet();
+      this.attemptNum = input.getAttemptNumber();
     }
 
     void spillProcessed(int spillId) {
@@ -584,7 +597,7 @@ public class ShuffleManager implements FetcherCallback {
 
     public String toString() {
       return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId
-          +  ", id=" + id + "]";
+          +  ", id=" + id + ", attemptNum=" + attemptNum + "]";
     }
   }
 
@@ -699,21 +712,17 @@ public class ShuffleManager implements FetcherCallback {
      * For pipelinedshuffle it is possible to get multiple spills. Claim success only when
      * all spills pertaining to an attempt are done.
      */
-    ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier);
-
-    //TODO: need to check for speculative tasks later. TEZ-2132
-    if (srcAttemptIdentifier.getAttemptNumber() > 0) {
-      //speculative attempts or failure attempts. Fail fast here.
-      reportFatalError(new IOException(), "Previous event already got scheduled for " +
-          srcAttemptIdentifier + ". Previous attempt's data could have been already merged
"
-          + "to memory/disk outputs.  Failing the fetch early.");
+    if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
       return;
     }
 
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
+
     //for empty partition case
     if (eventInfo == null && fetchedInput instanceof NullFetchedInput) {
       eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
-      shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo);
+      shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
     }
 
     assert(eventInfo != null);
@@ -737,7 +746,7 @@ public class ShuffleManager implements FetcherCallback {
       //check if we downloaded all spills pertaining to this InputAttemptIdentifier
       if (eventInfo.isDone()) {
         adjustCompletedInputs(fetchedInput);
-        shuffleInfoEventsMap.remove(srcAttemptIdentifier);
+        shuffleInfoEventsMap.remove(srcAttemptIdentifier.getInputIdentifier());
       }
     } finally {
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index d3ee161..5ae10ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -51,6 +51,7 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -79,7 +80,7 @@ class ShuffleScheduler {
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle
is
   // enabled in source.
   @VisibleForTesting
-  final Map<InputAttemptIdentifier, ShuffleEventInfo> shuffleInfoEventsMap = Maps.newHashMap();
+  final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
   private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
@@ -164,6 +165,7 @@ class ShuffleScheduler {
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 
+    shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -190,12 +192,14 @@ class ShuffleScheduler {
   static class ShuffleEventInfo {
     BitSet eventsProcessed;
     int finalEventId = -1; //0 indexed
+    int attemptNum;
     String id;
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
       this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
       this.eventsProcessed = new BitSet();
+      this.attemptNum = input.getAttemptNumber();
     }
 
     void spillProcessed(int spillId) {
@@ -217,7 +221,7 @@ class ShuffleScheduler {
 
     public String toString() {
       return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId
-          +  ", id=" + id + "]";
+          +  ", id=" + id + ", attemptNum=" + attemptNum + "]";
     }
   }
 
@@ -264,21 +268,18 @@ class ShuffleScheduler {
         setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
         numFetchedSpills++;
       } else {
-        ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier);
-
-        //TODO: need to check for speculative tasks later. TEZ-2132
-        if (eventInfo != null && srcAttemptIdentifier.getAttemptNumber() > 0)
{
-          //speculative attempts or failure attempts. Fail fast here.
-          shuffle.reportException(new IOException("Previous event already got scheduled for
" +
-              srcAttemptIdentifier + ". Previous attempt's data could have been already merged
"
-              + "to memory/disk outputs.  Failing the fetch early."));
+        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+        //Allow only one task attempt to proceed.
+        if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
           return;
         }
 
+        ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
+
         //Possible that Shuffle event handler invoked this, due to empty partitions
         if (eventInfo == null && output == null) {
           eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
-          shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo);
+          shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
         }
 
         assert(eventInfo != null);
@@ -292,8 +293,8 @@ class ShuffleScheduler {
         //check if we downloaded all spills pertaining to this InputAttemptIdentifier
         if (eventInfo.isDone()) {
           remainingMaps = remainingMaps - 1;
-          setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
-          shuffleInfoEventsMap.remove(srcAttemptIdentifier);
+          setInputFinished(inputIdentifier.getInputIndex());
+          shuffleInfoEventsMap.remove(inputIdentifier);
           if (LOG.isTraceEnabled()) {
             LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
                 shuffleInfoEventsMap);
@@ -335,6 +336,28 @@ class ShuffleScheduler {
     // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory
leak in case of speculation.
   }
 
+  private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) {
+    //For pipelined shuffle.
+    //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
+    if (input.canRetrieveInputInChunks()) {
+      ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+      if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum)
{
+        reportExceptionForInput(new IOException("Previous event already got scheduled for
" +
+            input + ". Previous attempt's data could have been already merged "
+            + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum="
+            + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed
+            + ", newAttemptNum=" + input.getAttemptNumber()));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  void reportExceptionForInput(Exception exception) {
+    LOG.fatal(exception);
+    shuffle.reportException(exception);
+  }
 
   private void logProgress() {
     double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
@@ -496,17 +519,12 @@ class ShuffleScheduler {
       mapLocations.put(identifier, host);
     }
 
-    if (srcAttempt.canRetrieveInputInChunks()) {
-      if (shuffleInfoEventsMap.get(srcAttempt) == null) {
-        //TODO: need to check for speculative tasks later. TEZ-2132
-        shuffleInfoEventsMap.put(srcAttempt, new ShuffleEventInfo(srcAttempt));
-      } else if (srcAttempt.getAttemptNumber() > 0) {
-        //speculative attempts or failure attempts. Fail fast here.
-        shuffle.reportException(new IOException(srcAttempt + " already exists. "
-            + "Previous attempt's data could have been already merged "
-            + "to memory/disk outputs.  Failing the fetch early."));
-        return;
-      }
+    //Allow only one task attempt to proceed.
+    if (!validateInputAttemptForPipelinedShuffle(srcAttempt)) {
+      return;
+    }
+    if (shuffleInfoEventsMap.get(srcAttempt.getInputIdentifier()) == null) {
+      shuffleInfoEventsMap.put(srcAttempt.getInputIdentifier(), new ShuffleEventInfo(srcAttempt));
     }
 
     host.addKnownMap(srcAttempt);
@@ -523,7 +541,7 @@ class ShuffleScheduler {
   public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
     LOG.info("Adding obsolete input: " + srcAttempt);
-    if (shuffleInfoEventsMap.containsKey(srcAttempt)) {
+    if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
       //Fail fast here.
       shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput,
but it "
           + "exists in shuffleInfoEventMap. Some data could have been already merged "

http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 43edf15..44122a2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -18,21 +18,41 @@
 
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.junit.Test;
@@ -44,6 +64,7 @@ public class TestShuffleInputEventHandlerImpl {
   private static final String HOST = "localhost";
   private static final int PORT = 8080;
   private static final String PATH_COMPONENT = "attempttmp";
+  private final Configuration conf = new Configuration();
 
   @Test(timeout = 5000)
   public void testSimple() throws IOException {
@@ -133,6 +154,170 @@ public class TestShuffleInputEventHandlerImpl {
     verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier1));
     verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier2), eq(0));
   }
+
+
+  private InputContext createInputContext() {
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    return inputContext;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException
{
+    Path outDirBase = new Path(".", "outDir");
+    String[] outDirs = new String[] { outDirBase.toString() };
+    doReturn(outDirs).when(inputContext).getWorkDirs();
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs());
+
+    DataOutputBuffer out = new DataOutputBuffer();
+    Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null));
+    token.write(out);
+    doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData(
+        TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+    ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2,
+        1024, false, -1, null, inputAllocator);
+    ShuffleManager shuffleManager = spy(realShuffleManager);
+    return shuffleManager;
+  }
+
+  /**
+   * In pipelined shuffle, check if multiple attempt numbers are processed and
+   * exceptions are reported properly.
+   *
+   * @throws IOException
+   */
+  @Test(timeout = 5000)
+  public void testPipelinedShuffleEvents() throws IOException {
+
+    InputContext inputContext = createInputContext();
+    ShuffleManager shuffleManager = createShuffleManager(inputContext);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    //0--> 1 with spill id 0 (attemptNum 0)
+    Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
+    handler.handleEvents(Collections.singletonList(dme));
+
+    InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1),
0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0));
+
+    //0--> 1 with spill id 1 (attemptNum 0)
+    dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
+    handler.handleEvents(Collections.singletonList(dme));
+
+    InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1),
0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
+    verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0));
+
+    //0--> 1 with spill id 1 (attemptNum 1).  This should report exception
+    dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1);
+    handler.handleEvents(Collections.singletonList(dme));
+    verify(inputContext).fatalError(any(Throwable.class), anyString());
+  }
+
+  /**
+   * In pipelined shuffle, check if processing & exceptions are done correctly when attempts
are
+   * received in out of order fashion (e.g attemptNum 1 arrives before attemptNum 0)
+   *
+   * @throws IOException
+   */
+  @Test(timeout = 5000)
+  public void testPipelinedShuffleEvents_WithOutOfOrderAttempts() throws IOException {
+    InputContext inputContext = createInputContext();
+    ShuffleManager shuffleManager = createShuffleManager(inputContext);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    //0--> 1 with spill id 0 (attemptNum 1).  attemptNum 0 is not sent.
+    Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
+    handler.handleEvents(Collections.singletonList(dme));
+
+    InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1),
1,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
+    verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
+
+    //Now send attemptNum 0.  This should throw exception, because attempt #1 is already
added
+    dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
+    handler.handleEvents(Collections.singletonList(dme));
+    verify(inputContext).fatalError(any(Throwable.class), anyString());
+  }
+
+  /**
+   * In pipelined shuffle, check if processing & exceptions are done correctly when empty
+   * partitions are sent
+   *
+   * @throws IOException
+   */
+  @Test(timeout = 5000)
+  public void testPipelinedShuffleEvents_WithEmptyPartitions() throws IOException {
+    InputContext inputContext = createInputContext();
+    ShuffleManager shuffleManager = createShuffleManager(inputContext);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    //0--> 1 with spill id 0 (attemptNum 0) with empty partitions
+    BitSet bitSet = new BitSet(4);
+    bitSet.flip(0, 4);
+    Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0);
+    handler.handleEvents(Collections.singletonList(dme));
+
+    InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1),
0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected);
+
+    //0--> 1 with spill id 1 (attemptNum 0)
+    handler.handleEvents(Collections.singletonList(dme));
+    dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
+    expected = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
+    verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected);
+
+
+    //Now send attemptNum 1.  This should throw exception, because attempt #1 is already
added
+    dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
+    handler.handleEvents(Collections.singletonList(dme));
+    verify(inputContext).fatalError(any(Throwable.class), anyString());
+  }
+
+  private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx,
+      int spillId, boolean isLastSpill, BitSet emptyPartitions, int numPartitions, int attemptNum)
+      throws IOException {
+
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+
+    if (emptyPartitions.cardinality() != 0) {
+      // Empty partitions exist
+      ByteString emptyPartitionsByteString =
+          TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions));
+      payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
+    }
+
+    if (emptyPartitions.cardinality() != numPartitions) {
+      // Populate payload only if at least 1 partition has data
+      payloadBuilder.setHost(HOST);
+      payloadBuilder.setPort(PORT);
+      payloadBuilder.setPathComponent("attemptPath");
+    }
+
+    if (addSpillDetails) {
+      payloadBuilder.setSpillId(spillId);
+      payloadBuilder.setLastEvent(isLastSpill);
+    }
+
+    ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
+    return  DataMovementEvent.create(srcIdx, targetIdx, attemptNum, payload);
+  }
   
   private Event createDataMovementEvent(int srcIndex, int targetIndex,
       ByteString emptyPartitionByteString) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 700d9a5..460db01 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -27,10 +27,12 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -77,22 +79,28 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
 
   private Event createDataMovementEvent(int srcIndex, int targetIndex,
       ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
-      finalMergeDisabled, boolean incrementalEvent) {
+      finalMergeDisabled, boolean incrementalEvent, int spillId) {
     return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
-        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, 0);
+        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT);
   }
 
   private Event createDataMovementEvent(int srcIndex, int targetIndex,
       ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
-      finalMergeDisabled, boolean incrementalEvent, int spillId) {
+      finalMergeDisabled, boolean incrementalEvent, int spillId, int attemptNum) {
     return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
-        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT);
+        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT, attemptNum);
   }
 
-
   private Event createDataMovementEvent(int srcIndex, int targetIndex,
       ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
       finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port) {
+    return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
+        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, host, port, 0);
+  }
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
+      finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port, int
attemptNum) {
     ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder =
         ShuffleUserPayloads.DataMovementEventPayloadProto
             .newBuilder();
@@ -110,7 +118,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }
     return DataMovementEvent
-        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
+        .create(srcIndex, targetIndex, attemptNum, builder.build().toByteString().asReadOnlyByteBuffer());
   }
 
   @Before
@@ -139,7 +147,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         inputContext,
         config,
         numInputs,
-        null,
+        mock(Shuffle.class),
         shuffledInputsCounter,
         reduceShuffleBytes,
         reduceDataSizeDecompressed,
@@ -153,8 +161,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     mergeManager = mock(MergeManager.class);
   }
 
-  @Test
-  public void testFinalMergeDisabledEvents() throws IOException, InterruptedException {
+  @Test (timeout = 10000)
+  public void testPiplinedShuffleEvents() throws IOException, InterruptedException {
     //test with 2 events per input (2 inputs)
     int attemptNum = 0;
     int inputIdx = 0;
@@ -166,7 +174,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     int partitionId = attemptNum;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
eq(id1));
-    verify(scheduler).shuffleInfoEventsMap.containsKey(id1);
+    verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
 
     //Send final_update event.
     Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false,
1);
@@ -176,9 +184,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(Collections.singletonList(dme2));
     baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     partitionId = attemptNum;
-    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2));
+    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
eq(id2));
-    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2));
+    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
 
     MapHost host = scheduler.getHost();
     assertTrue(host != null);
@@ -213,6 +221,34 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     assertTrue(scheduler.isDone());
   }
 
+  @Test (timeout = 5000)
+  public void testPiplinedShuffleEvents_WithOutofOrderAttempts() throws IOException, InterruptedException
{
+    //Process attempt #1 first
+    int attemptNum = 1;
+    int inputIdx = 1;
+    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
+
+    Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0,
attemptNum);
+    handler.handleEvents(Collections.singletonList(dme1));
+
+    InputAttemptIdentifier id1 =
+        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
+
+    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri),
eq(id1));
+
+    //Attempt #0 comes up. When processing this, it should report exception
+    attemptNum = 0;
+    inputIdx = 1;
+    Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0,
attemptNum);
+    handler.handleEvents(Collections.singletonList(dme2));
+
+    InputAttemptIdentifier id2 =
+        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
+    verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class));
+  }
+
   @Test(timeout = 5000)
   public void basicTest() throws IOException {
     List<Event> events = new LinkedList<Event>();


Mime
View raw message