Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 28E921805B for ; Wed, 6 May 2015 21:39:28 +0000 (UTC) Received: (qmail 70914 invoked by uid 500); 6 May 2015 21:39:28 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 70874 invoked by uid 500); 6 May 2015 21:39:28 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 70864 invoked by uid 99); 6 May 2015 21:39:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 21:39:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCAF5E054C; Wed, 6 May 2015 21:39:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op. (sseth) Date: Wed, 6 May 2015 21:39:27 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 7476fae83 -> 55308630b TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/55308630 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/55308630 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/55308630 Branch: refs/heads/master Commit: 55308630b6354ce070550d1ea4efbedbbae8e13a Parents: 7476fae Author: Siddharth Seth Authored: Wed May 6 14:39:08 2015 -0700 Committer: Siddharth Seth Committed: Wed May 6 14:39:08 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/lib/MRReaderMapReduce.java | 3 +- .../tez/mapreduce/lib/MRReaderMapred.java | 3 + .../apache/tez/mapreduce/output/MROutput.java | 3 +- .../library/api/IOInterruptedException.java | 40 +++++++ .../tez/runtime/library/api/KeyValueReader.java | 2 + .../tez/runtime/library/api/KeyValueWriter.java | 2 + .../runtime/library/api/KeyValuesReader.java | 1 + .../runtime/library/api/KeyValuesWriter.java | 2 + .../common/readers/UnorderedKVReader.java | 4 +- .../common/shuffle/orderedgrouped/Shuffle.java | 6 +- .../common/sort/impl/ExternalSorter.java | 3 +- .../common/sort/impl/PipelinedSorter.java | 20 ++-- .../common/sort/impl/dflt/DefaultSorter.java | 11 +- .../writers/UnorderedPartitionedKVWriter.java | 3 +- .../input/ConcatenatedMergedKeyValueInput.java | 8 +- .../input/ConcatenatedMergedKeyValuesInput.java | 8 +- .../library/input/OrderedGroupedKVInput.java | 12 +- .../common/readers/TestUnorderedKVReader.java | 22 ++++ .../input/TestOrderedGroupedKVInput.java | 113 +++++++++++++++++++ 20 files changed, 246 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fd45454..c865f12 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op. TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable. Release 0.7.0: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java index 0495751..5fc3e49 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib; import java.io.IOException; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.mapred.JobConf; @@ -116,7 +117,7 @@ public class MRReaderMapReduce extends MRReader { hasNext = recordReader.nextKeyValue(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Interrupted while checking for next key-value", e); + throw new IOInterruptedException("Interrupted while checking for next key-value", e); } if (hasNext) { inputRecordCounter.increment(1); http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java index 366e7a7..1bf71f6 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java @@ -117,6 +117,9 @@ public class MRReaderMapred extends MRReader { hasCompletedProcessing(); completedProcessing = true; } + // The underlying reader does not throw InterruptedExceptions. Cannot convert to an + // IOInterruptedException without checking the interrupt flag on each request, which is also + // not guaranteed. Relying on the user to ensure Interrupts are handled correctly. return hasNext; } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index d19f707..a3b19ed 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -499,7 +500,7 @@ public class MROutput extends AbstractLogicalOutput { newRecordWriter.write(key, value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Interrupted while writing next key-value",e); + throw new IOInterruptedException("Interrupted while writing next key-value",e); } } else { oldRecordWriter.write(key, value); http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java new file mode 100644 index 0000000..776b2a3 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Indicates that an IOOperation was interrupted + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class IOInterruptedException extends IOException { + + public IOInterruptedException(String message) { + super(message); + } + + public IOInterruptedException(String message, Throwable cause) { + super(message, cause); + } + + public IOInterruptedException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java index d504d08..47f335b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java @@ -49,6 +49,7 @@ public abstract class KeyValueReader extends Reader { * @return true if another key/value(s) pair exists, false if there are no more. * @throws IOException * if an error occurs + * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted */ public abstract boolean next() throws IOException; @@ -63,6 +64,7 @@ public abstract class KeyValueReader extends Reader { /** * Returns the current value * @return the current value + * * @throws IOException */ public abstract Object getCurrentValue() throws IOException; http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java index 6acb24b..b5c4294 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java @@ -39,6 +39,8 @@ public abstract class KeyValueWriter extends Writer { * the value to write * @throws IOException * if an error occurs + * @throws {@link IOInterruptedException} if IO was interrupted + * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted */ public abstract void write(Object key, Object value) throws IOException; } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java index 510f4b7..7760818 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java @@ -49,6 +49,7 @@ public abstract class KeyValuesReader extends Reader { * @return true if another key/value(s) pair exists, false if there are no more. * @throws IOException * if an error occurs + * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted */ public abstract boolean next() throws IOException; http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java index 50fc2d6..9cdde43 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java @@ -38,6 +38,8 @@ public abstract class KeyValuesWriter extends KeyValueWriter { * @param values * values to write * @throws java.io.IOException + * @throws {@link IOInterruptedException} if IO was interrupted + * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted */ public abstract void write(Object key, Iterable values) throws IOException; } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index fc2e312..a8dd1b2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers; import java.io.IOException; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -168,7 +169,6 @@ public class UnorderedKVReader extends KeyValueReader { * * @return true if the next input exists, false otherwise * @throws IOException - * @throws InterruptedException */ private boolean moveToNextInput() throws IOException { if (currentReader != null) { // Close the current reader. @@ -185,7 +185,7 @@ public class UnorderedKVReader extends KeyValueReader { } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for next available input", e); Thread.currentThread().interrupt(); - throw new IOException(e); + throw new IOInterruptedException(e); } if (currentFetchedInput == null) { hasCompletedProcessing(); http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index ee05378..cb12a63 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -305,6 +305,7 @@ public class Shuffle implements ExceptionReporter { kvIter = runShuffleFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); + // Processor interrupted while waiting for errors, will see an InterruptedException. handleThrowable(cause); } if (isShutDown.get()) { @@ -375,7 +376,9 @@ public class Shuffle implements ExceptionReporter { try { kvIter = merger.close(); } catch (Throwable e) { - throw new ShuffleError("Error while doing final merge " , e); + // Set the throwable so that future.get() sees the reported errror. + throwable.set(e); + throw new ShuffleError("Error while doing final merge ", e); } mergePhaseTime.setValue(System.currentTimeMillis() - startTime); @@ -513,6 +516,7 @@ public class Shuffle implements ExceptionReporter { LOG.info("Already shutdown. Ignoring error"); } else { LOG.error("ShuffleRunner failed with error", t); + // In case of an abort / Interrupt - the runtime makes sure that this is ignored. inputContext.fatalError(t, "Shuffle Runner Failed"); cleanupIgnoreErrors(); } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index ca4d889..40d22fe 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.Map; import com.google.common.collect.Maps; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -267,7 +268,7 @@ public abstract class ExternalSorter { combiner.combine(kvIter, writer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException(e); + throw new IOInterruptedException("Combiner interrupted", e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 030440e..d9de921 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.PriorityQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -37,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -341,7 +343,7 @@ public class PipelinedSorter extends ExternalSorter { mapOutputByteCounter.increment(valend - keystart); } - public void spill() throws IOException { + public void spill() throws IOException { // create spill file final long size = capacity + + (partitions * APPROX_HEADER_LENGTH); @@ -352,7 +354,13 @@ public class PipelinedSorter extends ExternalSorter { FSDataOutputStream out = rfs.create(filename, true, 4096); try { - merger.ready(); // wait for all the future results from sort threads + try { + merger.ready(); // wait for all the future results from sort threads + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted while waiting for mergers to complete"); + throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e); + } LOG.info("Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { @@ -391,9 +399,6 @@ public class PipelinedSorter extends ExternalSorter { //TODO: honor cache limits indexCacheList.add(spillRec); ++numSpills; - } catch(InterruptedException ie) { - // TODO:the combiner has been interrupted - Thread.currentThread().interrupt(); } finally { out.close(); } @@ -568,6 +573,7 @@ public class PipelinedSorter extends ExternalSorter { cleanup(); } Thread.currentThread().interrupt(); + throw new IOInterruptedException("Interrupted while closing Output", ie); } } @@ -1046,7 +1052,7 @@ public class PipelinedSorter extends ExternalSorter { iter = futureIter.get(); this.add(iter); } - + StringBuilder sb = new StringBuilder(); for(SpanIterator sp: heap) { sb.append(sp.toString()); @@ -1056,7 +1062,7 @@ public class PipelinedSorter extends ExternalSorter { } LOG.info("Heap = " + sb.toString()); return true; - } catch(Exception e) { + } catch(ExecutionException e) { LOG.info(e.toString()); return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 9783c79..afe07f0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -607,7 +608,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException( + throw new IOInterruptedException( "Buffer interrupted while waiting for the writer", e); } } @@ -644,7 +645,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { LOG.info("Spill thread interrupted"); //Reset status Thread.currentThread().interrupt(); - throw new IOException("Spill failed", e); + throw new IOInterruptedException("Spill failed", e); } } @@ -769,7 +770,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { + " failed : " + ExceptionUtils.getStackTrace(lspillException); outputContext.fatalError(lspillException, logMsg); } - throw new IOException("Spill failed", lspillException); + if (lspillException instanceof InterruptedException) { + throw new IOInterruptedException("Spill failed", lspillException); + } else { + throw new IOException("Spill failed", lspillException); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 37d8be6..9a98cd1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -54,6 +54,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.sort.impl.IFile; @@ -354,7 +355,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return availableBuffers.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting for next buffer", e); + throw new IOInterruptedException("Interrupted while waiting for next buffer", e); } } } else { http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index 14b1e2c..45784d9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -64,7 +64,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { currentReader = (KeyValueReader) reader; currentReaderIndex++; } catch (Exception e) { - throw new IOException(e); + // An InterruptedException is not expected here since this works off of + // underlying readers which take care of throwing IOInterruptedExceptions + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } } } return true; http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index 2a1e4c6..27ff324 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -65,7 +65,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { currentReader = (KeyValuesReader) reader; currentReaderIndex++; } catch (Exception e) { - throw new IOException(e); + // An InterruptedException is not expected here since this works off of + // underlying readers which take care of throwing IOInterruptedExceptions + if (e instanceof IOException) { + throw (IOException)e; + } else { + throw new IOException(e); + } } } return true; http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 49cf102..12a5955 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -27,6 +27,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -121,7 +123,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { if (!isStarted.get()) { memoryUpdateCallbackHandler.validateUpdateReceived(); // Start the shuffle - copy and merge - shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); + shuffle = createShuffle(); shuffle.run(); if (LOG.isDebugEnabled()) { LOG.debug("Initialized the handlers in shuffle..Safe to start processing.."); @@ -137,6 +139,11 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { } } + @VisibleForTesting + Shuffle createShuffle() throws IOException { + return new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); + } + /** * Check if the input is ready for consumption * @@ -207,6 +214,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { * previous K-V pair will throw an Exception * * @return a KVReader over the sorted input. + * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted */ @Override public KeyValuesReader getReader() throws IOException, TezException { @@ -240,7 +248,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { waitForInputReady(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting for input ready", e); + throw new IOInterruptedException("Interrupted while waiting for input ready", e); } } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java index 51ea42d..80bdc42 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.library.api.IOInterruptedException; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; @@ -48,6 +49,7 @@ import java.util.LinkedList; import static junit.framework.TestCase.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -165,4 +167,24 @@ public class TestUnorderedKVReader { } } + @Test(timeout = 5000) + public void testInterruptOnNext() throws IOException, InterruptedException { + ShuffleManager shuffleManager = mock(ShuffleManager.class); + + // Simulate an interrupt while waiting for the next fetched input. + doThrow(new InterruptedException()).when(shuffleManager).getNextInput(); + TezCounters counters = new TezCounters(); + TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED); + UnorderedKVReader reader = + new UnorderedKVReader(shuffleManager, defaultConf, null, false, -1, -1, + inputRecords); + + try { + reader.next(); + fail("No data available to reader. Should not be able to access any record"); + } catch (IOInterruptedException e) { + // Expected exception. Any other should fail the test. + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java new file mode 100644 index 0000000..d4be802 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.input; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.library.api.IOInterruptedException; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestOrderedGroupedKVInput { + + @Test(timeout = 5000) + public void testInterruptWhileAwaitingInput() throws IOException, TezException { + + InputContext inputContext = createMockInputContext(); + OrderedGroupedKVInput kvInput = new OrderedGroupedKVInputForTest(inputContext, 10); + kvInput.initialize(); + + kvInput.start(); + + try { + kvInput.getReader(); + Assert.fail("getReader should not return since underlying inputs are not ready"); + } catch (IOException e) { + Assert.assertTrue(e instanceof IOInterruptedException); + } + + } + + + private InputContext createMockInputContext() throws IOException { + InputContext inputContext = mock(InputContext.class); + Configuration conf = new TezConfiguration(); + UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); + String[] workingDirs = new String[]{"workDir1"}; + TezCounters counters = new TezCounters(); + + + doReturn(payLoad).when(inputContext).getUserPayload(); + doReturn(workingDirs).when(inputContext).getWorkDirs(); + doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask(); + doReturn(counters).when(inputContext).getCounters(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + + if (args[1] instanceof MemoryUpdateCallbackHandler) { + MemoryUpdateCallbackHandler memUpdateCallbackHandler = + (MemoryUpdateCallbackHandler) args[1]; + memUpdateCallbackHandler.memoryAssigned(200 * 1024 * 1024); + } else { + Assert.fail(); + } + return null; + } + }).when(inputContext).requestInitialMemory(any(long.class), + any(MemoryUpdateCallbackHandler.class)); + + return inputContext; + } + + static class OrderedGroupedKVInputForTest extends OrderedGroupedKVInput { + + public OrderedGroupedKVInputForTest(InputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + + Shuffle createShuffle() throws IOException { + Shuffle shuffle = mock(Shuffle.class); + try { + doThrow(new InterruptedException()).when(shuffle).waitForInput(); + } catch (InterruptedException e) { + Assert.fail(); + } catch (TezException e) { + Assert.fail(); + } + return shuffle; + } + } + +}