Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0930317BDC for ; Fri, 27 Mar 2015 00:57:30 +0000 (UTC) Received: (qmail 16757 invoked by uid 500); 27 Mar 2015 00:57:20 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 16667 invoked by uid 500); 27 Mar 2015 00:57:20 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 15499 invoked by uid 99); 27 Mar 2015 00:57:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Mar 2015 00:57:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A21B2E2F48; Fri, 27 Mar 2015 00:57:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 27 Mar 2015 00:57:40 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/52] [abbrv] tez git commit: TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt (rbalamohan) TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2703f0b0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2703f0b0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2703f0b0 Branch: refs/heads/TEZ-2003 Commit: 2703f0b089d8435dd3f50c115a46a74275b67d80 Parents: a44f5c5 Author: Rajesh Balamohan Authored: Mon Mar 23 05:07:06 2015 +0530 Committer: Rajesh Balamohan Committed: Mon Mar 23 05:07:06 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/impl/ShuffleManager.java | 69 ++++--- .../orderedgrouped/ShuffleScheduler.java | 68 ++++--- .../impl/TestShuffleInputEventHandlerImpl.java | 185 +++++++++++++++++++ ...tShuffleInputEventHandlerOrderedGrouped.java | 60 ++++-- 5 files changed, 316 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a33fe2b..9e11008 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt TEZ-2210. Record DAG AM CPU usage stats TEZ-2203. Intern strings in tez counters TEZ-2202. Fix LocalTaskExecutionThread ID to the standard thread numbering. http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index d2e9682..3995995 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -25,6 +25,7 @@ import java.text.DecimalFormat; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -168,7 +169,7 @@ public class ShuffleManager implements FetcherCallback { //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source. @VisibleForTesting - final Map shuffleInfoEventsMap = Maps.newHashMap(); + final Map shuffleInfoEventsMap; // TODO More counters - FetchErrors, speed? @@ -251,6 +252,8 @@ public class ShuffleManager implements FetcherCallback { Arrays.sort(this.localDisks); + shuffleInfoEventsMap = new ConcurrentHashMap(); + LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled=" @@ -354,6 +357,24 @@ public class ShuffleManager implements FetcherCallback { return null; } } + + private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) { + //For pipelined shuffle. + //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt + if (input.canRetrieveInputInChunks()) { + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { + //speculative attempts or failure attempts. Fail fast here. + reportFatalError(new IOException(), input + " already exists. " + + "Previous attempt's data could have been already merged " + + "to memory/disk outputs. Failing the fetch early. currentAttemptNum=" + eventInfo + .attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum=" + + input.getAttemptNumber()); + return false; + } + } + return true; + } private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { @@ -385,12 +406,7 @@ public class ShuffleManager implements FetcherCallback { InputAttemptIdentifier input = inputIter.next(); //For pipelined shuffle. - //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt - if (input.canRetrieveInputInChunks() && input.getAttemptNumber() > 0) { - //speculative attempts or failure attempts. Fail fast here. - reportFatalError(new IOException(), input + " already exists. " - + "Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early instead of adding to fetcher"); + if (!validateInputAttemptForPipelinedShuffle(input)) { continue; } @@ -431,18 +447,13 @@ public class ShuffleManager implements FetcherCallback { LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host); } - if (srcAttemptIdentifier.canRetrieveInputInChunks()) { - //TODO: need to check for speculative tasks later. TEZ-2132 - if (srcAttemptIdentifier.getAttemptNumber() > 0) { - //speculative attempts or failure attempts. Fail fast here. - reportFatalError(new IOException(), srcAttemptIdentifier + " already exists. " - + "Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early instead of adding to addKnownInput"); - return; - } - if (shuffleInfoEventsMap.get(srcAttemptIdentifier) == null) { - shuffleInfoEventsMap.put(srcAttemptIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); - } + if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { + return; + } + + InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + if (shuffleInfoEventsMap.get(inputIdentifier) == null) { + shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); } host.addKnownInput(srcAttemptIdentifier); @@ -555,12 +566,14 @@ public class ShuffleManager implements FetcherCallback { static class ShuffleEventInfo { BitSet eventsProcessed; int finalEventId = -1; //0 indexed + int attemptNum; String id; ShuffleEventInfo(InputAttemptIdentifier input) { this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); + this.attemptNum = input.getAttemptNumber(); } void spillProcessed(int spillId) { @@ -584,7 +597,7 @@ public class ShuffleManager implements FetcherCallback { public String toString() { return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId - + ", id=" + id + "]"; + + ", id=" + id + ", attemptNum=" + attemptNum + "]"; } } @@ -699,21 +712,17 @@ public class ShuffleManager implements FetcherCallback { * For pipelinedshuffle it is possible to get multiple spills. Claim success only when * all spills pertaining to an attempt are done. */ - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier); - - //TODO: need to check for speculative tasks later. TEZ-2132 - if (srcAttemptIdentifier.getAttemptNumber() > 0) { - //speculative attempts or failure attempts. Fail fast here. - reportFatalError(new IOException(), "Previous event already got scheduled for " + - srcAttemptIdentifier + ". Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early."); + if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { return; } + InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); + //for empty partition case if (eventInfo == null && fetchedInput instanceof NullFetchedInput) { eventInfo = new ShuffleEventInfo(srcAttemptIdentifier); - shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo); + shuffleInfoEventsMap.put(inputIdentifier, eventInfo); } assert(eventInfo != null); @@ -737,7 +746,7 @@ public class ShuffleManager implements FetcherCallback { //check if we downloaded all spills pertaining to this InputAttemptIdentifier if (eventInfo.isDone()) { adjustCompletedInputs(fetchedInput); - shuffleInfoEventsMap.remove(srcAttemptIdentifier); + shuffleInfoEventsMap.remove(srcAttemptIdentifier.getInputIdentifier()); } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index d3ee161..5ae10ab 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -51,6 +51,7 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -79,7 +80,7 @@ class ShuffleScheduler { //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @VisibleForTesting - final Map shuffleInfoEventsMap = Maps.newHashMap(); + final Map shuffleInfoEventsMap; private Set pendingHosts = new HashSet(); private Set obsoleteInputs = new HashSet(); @@ -164,6 +165,7 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); + shuffleInfoEventsMap = new HashMap(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting @@ -190,12 +192,14 @@ class ShuffleScheduler { static class ShuffleEventInfo { BitSet eventsProcessed; int finalEventId = -1; //0 indexed + int attemptNum; String id; ShuffleEventInfo(InputAttemptIdentifier input) { this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber(); this.eventsProcessed = new BitSet(); + this.attemptNum = input.getAttemptNumber(); } void spillProcessed(int spillId) { @@ -217,7 +221,7 @@ class ShuffleScheduler { public String toString() { return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId - + ", id=" + id + "]"; + + ", id=" + id + ", attemptNum=" + attemptNum + "]"; } } @@ -264,21 +268,18 @@ class ShuffleScheduler { setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); numFetchedSpills++; } else { - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(srcAttemptIdentifier); - - //TODO: need to check for speculative tasks later. TEZ-2132 - if (eventInfo != null && srcAttemptIdentifier.getAttemptNumber() > 0) { - //speculative attempts or failure attempts. Fail fast here. - shuffle.reportException(new IOException("Previous event already got scheduled for " + - srcAttemptIdentifier + ". Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early.")); + InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); + //Allow only one task attempt to proceed. + if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { return; } + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); + //Possible that Shuffle event handler invoked this, due to empty partitions if (eventInfo == null && output == null) { eventInfo = new ShuffleEventInfo(srcAttemptIdentifier); - shuffleInfoEventsMap.put(srcAttemptIdentifier, eventInfo); + shuffleInfoEventsMap.put(inputIdentifier, eventInfo); } assert(eventInfo != null); @@ -292,8 +293,8 @@ class ShuffleScheduler { //check if we downloaded all spills pertaining to this InputAttemptIdentifier if (eventInfo.isDone()) { remainingMaps = remainingMaps - 1; - setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); - shuffleInfoEventsMap.remove(srcAttemptIdentifier); + setInputFinished(inputIdentifier.getInputIndex()); + shuffleInfoEventsMap.remove(inputIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + shuffleInfoEventsMap); @@ -335,6 +336,28 @@ class ShuffleScheduler { // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation. } + private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) { + //For pipelined shuffle. + //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt + if (input.canRetrieveInputInChunks()) { + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { + reportExceptionForInput(new IOException("Previous event already got scheduled for " + + input + ". Previous attempt's data could have been already merged " + + "to memory/disk outputs. Failing the fetch early. currentAttemptNum=" + + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + + ", newAttemptNum=" + input.getAttemptNumber())); + return false; + } + } + return true; + } + + @VisibleForTesting + void reportExceptionForInput(Exception exception) { + LOG.fatal(exception); + shuffle.reportException(exception); + } private void logProgress() { double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); @@ -496,17 +519,12 @@ class ShuffleScheduler { mapLocations.put(identifier, host); } - if (srcAttempt.canRetrieveInputInChunks()) { - if (shuffleInfoEventsMap.get(srcAttempt) == null) { - //TODO: need to check for speculative tasks later. TEZ-2132 - shuffleInfoEventsMap.put(srcAttempt, new ShuffleEventInfo(srcAttempt)); - } else if (srcAttempt.getAttemptNumber() > 0) { - //speculative attempts or failure attempts. Fail fast here. - shuffle.reportException(new IOException(srcAttempt + " already exists. " - + "Previous attempt's data could have been already merged " - + "to memory/disk outputs. Failing the fetch early.")); - return; - } + //Allow only one task attempt to proceed. + if (!validateInputAttemptForPipelinedShuffle(srcAttempt)) { + return; + } + if (shuffleInfoEventsMap.get(srcAttempt.getInputIdentifier()) == null) { + shuffleInfoEventsMap.put(srcAttempt.getInputIdentifier(), new ShuffleEventInfo(srcAttempt)); } host.addKnownMap(srcAttempt); @@ -523,7 +541,7 @@ class ShuffleScheduler { public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) { // The incoming srcAttempt does not contain a path component. LOG.info("Adding obsolete input: " + srcAttempt); - if (shuffleInfoEventsMap.containsKey(srcAttempt)) { + if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { //Fail fast here. shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 43edf15..44122a2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -18,21 +18,41 @@ package org.apache.tez.runtime.library.common.shuffle.impl; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.BitSet; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.token.Token; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; import org.junit.Test; @@ -44,6 +64,7 @@ public class TestShuffleInputEventHandlerImpl { private static final String HOST = "localhost"; private static final int PORT = 8080; private static final String PATH_COMPONENT = "attempttmp"; + private final Configuration conf = new Configuration(); @Test(timeout = 5000) public void testSimple() throws IOException { @@ -133,6 +154,170 @@ public class TestShuffleInputEventHandlerImpl { verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier1)); verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier2), eq(0)); } + + + private InputContext createInputContext() { + InputContext inputContext = mock(InputContext.class); + doReturn(new TezCounters()).when(inputContext).getCounters(); + doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + return inputContext; + } + + @SuppressWarnings("unchecked") + private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException { + Path outDirBase = new Path(".", "outDir"); + String[] outDirs = new String[] { outDirBase.toString() }; + doReturn(outDirs).when(inputContext).getWorkDirs(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs()); + + DataOutputBuffer out = new DataOutputBuffer(); + Token token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null)); + token.write(out); + doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData( + TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); + + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + ShuffleManager realShuffleManager = new ShuffleManager(inputContext, conf, 2, + 1024, false, -1, null, inputAllocator); + ShuffleManager shuffleManager = spy(realShuffleManager); + return shuffleManager; + } + + /** + * In pipelined shuffle, check if multiple attempt numbers are processed and + * exceptions are reported properly. + * + * @throws IOException + */ + @Test(timeout = 5000) + public void testPipelinedShuffleEvents() throws IOException { + + InputContext inputContext = createInputContext(); + ShuffleManager shuffleManager = createShuffleManager(inputContext); + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + + ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, + shuffleManager, inputAllocator, null, false, 0); + + //0--> 1 with spill id 0 (attemptNum 0) + Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); + handler.handleEvents(Collections.singletonList(dme)); + + InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0)); + + //0--> 1 with spill id 1 (attemptNum 0) + dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); + handler.handleEvents(Collections.singletonList(dme)); + + InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1), 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); + + //0--> 1 with spill id 1 (attemptNum 1). This should report exception + dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1); + handler.handleEvents(Collections.singletonList(dme)); + verify(inputContext).fatalError(any(Throwable.class), anyString()); + } + + /** + * In pipelined shuffle, check if processing & exceptions are done correctly when attempts are + * received in out of order fashion (e.g attemptNum 1 arrives before attemptNum 0) + * + * @throws IOException + */ + @Test(timeout = 5000) + public void testPipelinedShuffleEvents_WithOutOfOrderAttempts() throws IOException { + InputContext inputContext = createInputContext(); + ShuffleManager shuffleManager = createShuffleManager(inputContext); + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + + ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, + shuffleManager, inputAllocator, null, false, 0); + + //0--> 1 with spill id 0 (attemptNum 1). attemptNum 0 is not sent. + Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); + handler.handleEvents(Collections.singletonList(dme)); + + InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 1, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); + + //Now send attemptNum 0. This should throw exception, because attempt #1 is already added + dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); + handler.handleEvents(Collections.singletonList(dme)); + verify(inputContext).fatalError(any(Throwable.class), anyString()); + } + + /** + * In pipelined shuffle, check if processing & exceptions are done correctly when empty + * partitions are sent + * + * @throws IOException + */ + @Test(timeout = 5000) + public void testPipelinedShuffleEvents_WithEmptyPartitions() throws IOException { + InputContext inputContext = createInputContext(); + ShuffleManager shuffleManager = createShuffleManager(inputContext); + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + + ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, + shuffleManager, inputAllocator, null, false, 0); + + //0--> 1 with spill id 0 (attemptNum 0) with empty partitions + BitSet bitSet = new BitSet(4); + bitSet.flip(0, 4); + Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0); + handler.handleEvents(Collections.singletonList(dme)); + + InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected); + + //0--> 1 with spill id 1 (attemptNum 0) + handler.handleEvents(Collections.singletonList(dme)); + dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); + expected = new InputAttemptIdentifier(new InputIdentifier(1), 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected); + + + //Now send attemptNum 1. This should throw exception, because attempt #1 is already added + dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); + handler.handleEvents(Collections.singletonList(dme)); + verify(inputContext).fatalError(any(Throwable.class), anyString()); + } + + private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx, + int spillId, boolean isLastSpill, BitSet emptyPartitions, int numPartitions, int attemptNum) + throws IOException { + + DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto + .newBuilder(); + + if (emptyPartitions.cardinality() != 0) { + // Empty partitions exist + ByteString emptyPartitionsByteString = + TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions)); + payloadBuilder.setEmptyPartitions(emptyPartitionsByteString); + } + + if (emptyPartitions.cardinality() != numPartitions) { + // Populate payload only if at least 1 partition has data + payloadBuilder.setHost(HOST); + payloadBuilder.setPort(PORT); + payloadBuilder.setPathComponent("attemptPath"); + } + + if (addSpillDetails) { + payloadBuilder.setSpillId(spillId); + payloadBuilder.setLastEvent(isLastSpill); + } + + ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer(); + return DataMovementEvent.create(srcIdx, targetIdx, attemptNum, payload); + } private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString) { http://git-wip-us.apache.org/repos/asf/tez/blob/2703f0b0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 700d9a5..460db01 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -27,10 +27,12 @@ import java.util.List; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -77,22 +79,28 @@ public class TestShuffleInputEventHandlerOrderedGrouped { private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean - finalMergeDisabled, boolean incrementalEvent) { + finalMergeDisabled, boolean incrementalEvent, int spillId) { return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, - allPartitionsEmpty, finalMergeDisabled, incrementalEvent, 0); + allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT); } private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean - finalMergeDisabled, boolean incrementalEvent, int spillId) { + finalMergeDisabled, boolean incrementalEvent, int spillId, int attemptNum) { return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, - allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT); + allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT, attemptNum); } - private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port) { + return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, + allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, host, port, 0); + } + + private Event createDataMovementEvent(int srcIndex, int targetIndex, + ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean + finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port, int attemptNum) { ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder = ShuffleUserPayloads.DataMovementEventPayloadProto .newBuilder(); @@ -110,7 +118,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { builder.setEmptyPartitions(emptyPartitionByteString); } return DataMovementEvent - .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer()); + .create(srcIndex, targetIndex, attemptNum, builder.build().toByteString().asReadOnlyByteBuffer()); } @Before @@ -139,7 +147,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { inputContext, config, numInputs, - null, + mock(Shuffle.class), shuffledInputsCounter, reduceShuffleBytes, reduceDataSizeDecompressed, @@ -153,8 +161,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { mergeManager = mock(MergeManager.class); } - @Test - public void testFinalMergeDisabledEvents() throws IOException, InterruptedException { + @Test (timeout = 10000) + public void testPiplinedShuffleEvents() throws IOException, InterruptedException { //test with 2 events per input (2 inputs) int attemptNum = 0; int inputIdx = 0; @@ -166,7 +174,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); int partitionId = attemptNum; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1)); - verify(scheduler).shuffleInfoEventsMap.containsKey(id1); + verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); @@ -176,9 +184,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme2)); baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); partitionId = attemptNum; - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2)); + assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2)); - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2)); + assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); MapHost host = scheduler.getHost(); assertTrue(host != null); @@ -213,6 +221,34 @@ public class TestShuffleInputEventHandlerOrderedGrouped { assertTrue(scheduler.isDone()); } + @Test (timeout = 5000) + public void testPiplinedShuffleEvents_WithOutofOrderAttempts() throws IOException, InterruptedException { + //Process attempt #1 first + int attemptNum = 1; + int inputIdx = 1; + String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); + + Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); + handler.handleEvents(Collections.singletonList(dme1)); + + InputAttemptIdentifier id1 = + new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + + verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); + + //Attempt #0 comes up. When processing this, it should report exception + attemptNum = 0; + inputIdx = 1; + Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); + handler.handleEvents(Collections.singletonList(dme2)); + + InputAttemptIdentifier id2 = + new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class)); + } + @Test(timeout = 5000) public void basicTest() throws IOException { List events = new LinkedList();