Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 666E011F5A for ; Wed, 16 Apr 2014 18:48:40 +0000 (UTC) Received: (qmail 78595 invoked by uid 500); 16 Apr 2014 18:48:39 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 78570 invoked by uid 500); 16 Apr 2014 18:48:39 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 78563 invoked by uid 99); 16 Apr 2014 18:48:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Apr 2014 18:48:39 +0000 X-ASF-Spam-Status: No, hits=-2001.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 16 Apr 2014 18:48:35 +0000 Received: (qmail 77031 invoked by uid 99); 16 Apr 2014 18:48:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Apr 2014 18:48:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C528D93C4CA; Wed, 16 Apr 2014 18:48:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: <78935811d3974318b8013dde44b3fc7a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-919. Fix shutdown handling for Shuffle. (sseth) Date: Wed, 16 Apr 2014 18:48:14 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master 36e940c6f -> 6b1871970 TEZ-919. Fix shutdown handling for Shuffle. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6b187197 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6b187197 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6b187197 Branch: refs/heads/master Commit: 6b1871970a4cd21ae5ad96f2e0c60d922066aa53 Parents: 36e940c Author: Siddharth Seth Authored: Wed Apr 16 11:44:41 2014 -0700 Committer: Siddharth Seth Committed: Wed Apr 16 11:44:41 2014 -0700 ---------------------------------------------------------------------- .../library/common/shuffle/impl/Fetcher.java | 102 ++++++-- .../library/common/shuffle/impl/Shuffle.java | 237 +++++++++++++++---- .../common/shuffle/impl/ShuffleScheduler.java | 1 + .../exceptions/InputAlreadyClosedException.java | 38 +++ .../library/input/ShuffledMergedInput.java | 7 +- .../runtime/library/shuffle/common/Fetcher.java | 134 +++++++---- .../shuffle/common/impl/ShuffleManager.java | 109 +++++---- 7 files changed, 474 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java index 32fad76..ac12f23 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java @@ -25,7 +25,6 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLConnection; import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.LinkedHashSet; @@ -80,6 +79,7 @@ class Fetcher extends Thread { private final ShuffleClientMetrics metrics; private final Shuffle shuffle; private final int id; + private final String logIdentifier; private static int nextId = 0; private int currentPartition = -1; @@ -104,10 +104,16 @@ class Fetcher extends Thread { private boolean keepAlive = false; + volatile HttpURLConnection connection; + volatile DataInputStream input; + public Fetcher(Configuration job, ShuffleScheduler scheduler, MergeManager merger, ShuffleClientMetrics metrics, - Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException { + Shuffle shuffle, SecretKey jobTokenSecret, + boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, + TezInputContext inputContext) throws IOException { + setDaemon(true); this.scheduler = scheduler; this.merger = merger; this.metrics = metrics; @@ -161,7 +167,8 @@ class Fetcher extends Thread { this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); - setName("fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id); + this.logIdentifier = "fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id; + setName(logIdentifier); setDaemon(true); synchronized (Fetcher.class) { @@ -195,6 +202,7 @@ class Fetcher extends Thread { // Shuffle copyFromHost(host); } finally { + cleanupCurrentConnection(); if (host != null) { scheduler.freeHost(host); metrics.threadFree(); @@ -211,6 +219,7 @@ class Fetcher extends Thread { public void shutDown() throws InterruptedException { this.stopped = true; interrupt(); + cleanupCurrentConnection(); try { join(5000); } catch (InterruptedException ie) { @@ -221,6 +230,31 @@ class Fetcher extends Thread { } } + private Object cleanupLock = new Object(); + private void cleanupCurrentConnection() { + // Synchronizing on cleanupLock to ensure we don't run into a parallel close + // Can't synchronize on the main class itself since that would cause the + // shutdown request to block + synchronized (cleanupLock) { + try { + if (input != null) { + LOG.info("Closing input on " + logIdentifier); + input.close(); + } + if (connection != null) { + LOG.info("Closing connection on " + logIdentifier); + connection.disconnect(); + } + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception while shutting down fetcher " + logIdentifier, e); + } else { + LOG.info("Exception while shutting down fetcher " + logIdentifier + ": " + e.getMessage()); + } + } + } + } + @VisibleForTesting protected HttpURLConnection openConnection(URL url) throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -263,8 +297,6 @@ class Fetcher extends Thread { remaining = new LinkedHashSet(srcAttempts); // Construct the url and connect - DataInputStream input; - HttpURLConnection connection = null; boolean connectSucceeded = false; try { @@ -285,7 +317,14 @@ class Fetcher extends Thread { ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); - connect(connection, connectionTimeout); + connect(connectionTimeout); + + if (stopped) { + LOG.info("Detected fetcher has been shutdown after connection establishment. Returning"); + cleanupCurrentConnection(); + putBackRemainingMapOutputs(host); + return; + } connectSucceeded = true; input = new DataInputStream(new BufferedInputStream(connection.getInputStream(), bufferSize)); @@ -313,6 +352,12 @@ class Fetcher extends Thread { SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret); LOG.info("for url="+msgToEncode+" sent hash and receievd reply"); } catch (IOException ie) { + if (stopped) { + LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown"); + cleanupCurrentConnection(); + putBackRemainingMapOutputs(host); + return; + } if (keepAlive && connection != null) { //Read the response body in case of error. This helps in connection reuse. //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html @@ -367,9 +412,9 @@ class Fetcher extends Thread { readErrorStream(connection.getErrorStream()); } } - - IOUtils.cleanup(LOG, input); - + + cleanupCurrentConnection(); + // Sanity check if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " @@ -503,6 +548,16 @@ class Fetcher extends Thread { metrics.successFetch(); return null; } catch (IOException ioe) { + if (stopped) { + LOG.info("Not reporting fetch failure for exception during data copy: [" + + ioe.getClass().getName() + ", " + ioe.getMessage() + "]"); + cleanupCurrentConnection(); + if (mapOutput != null) { + mapOutput.abort(); // Release resources + } + // Don't need to put back - since that's handled by the invoker + return EMPTY_ATTEMPT_ID_ARRAY; + } ioErrs.increment(1); if (srcAttemptId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + @@ -610,8 +665,7 @@ class Fetcher extends Thread { * only on the last failure. Instead of connecting with a timeout of * X, we try connecting with a timeout of x < X but multiple times. */ - private void connect(URLConnection connection, int connectionTimeout) - throws IOException { + private void connect(int connectionTimeout) throws IOException { int unit = 0; if (connectionTimeout < 0) { throw new IOException("Invalid timeout " @@ -626,6 +680,14 @@ class Fetcher extends Thread { connection.connect(); break; } catch (IOException ioe) { + + // Don't attempt another connect if already stopped. + if (stopped) { + LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: [" + + ioe.getClass().getName() + ", " + ioe.getMessage() + "]"); + return; + } + // update the total remaining connect-timeout connectionTimeout -= unit; @@ -646,31 +708,31 @@ class Fetcher extends Thread { } private void shuffleToMemory(MapHost host, MapOutput mapOutput, - InputStream input, + InputStream localInput, int decompressedLength, int compressedLength) throws IOException { IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength, ifileReadAhead, ifileReadAheadLength); + new IFileInputStream(localInput, compressedLength, ifileReadAhead, ifileReadAheadLength); - input = checksumIn; + localInput = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); - input = codec.createInputStream(input, decompressor); + localInput = codec.createInputStream(localInput, decompressor); } // Copy map-output into an in-memory buffer byte[] shuffleData = mapOutput.getMemory(); try { - IOUtils.readFully(input, shuffleData, 0, shuffleData.length); + IOUtils.readFully(localInput, shuffleData, 0, shuffleData.length); metrics.inputBytes(shuffleData.length); LOG.info("Read " + shuffleData.length + " bytes from map-output for " + mapOutput.getAttemptIdentifier()); } catch (IOException ioe) { // Close the streams - IOUtils.cleanup(LOG, input); + IOUtils.cleanup(LOG, localInput); // Re-throw throw ioe; @@ -679,7 +741,7 @@ class Fetcher extends Thread { } private void shuffleToDisk(MapHost host, MapOutput mapOutput, - InputStream input, + InputStream localInput, long compressedLength) throws IOException { // Copy data to local-disk @@ -689,7 +751,7 @@ class Fetcher extends Thread { final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { - int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + int n = localInput.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + mapOutput.getAttemptIdentifier()); @@ -706,7 +768,7 @@ class Fetcher extends Thread { output.close(); } catch (IOException ioe) { // Close the streams - IOUtils.cleanup(LOG, input, output); + IOUtils.cleanup(LOG, localInput, output); // Re-throw throw ioe; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java index a27fa31..b055f16 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java @@ -18,10 +18,13 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import javax.crypto.SecretKey; @@ -42,16 +45,23 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; +import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException; import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Usage: Create instance, setInitialMemoryAllocated(long), run() @@ -66,7 +76,6 @@ public class Shuffle implements ExceptionReporter { private final Configuration conf; private final TezInputContext inputContext; - private final int numInputs; private final ShuffleClientMetrics metrics; @@ -78,22 +87,35 @@ public class Shuffle implements ExceptionReporter { private final CompressionCodec codec; private final boolean ifileReadAhead; private final int ifileReadAheadLength; + private final int numFetchers; private Throwable throwable = null; private String throwingThreadName = null; - private FutureTask runShuffleFuture; + private final RunShuffleCallable runShuffleCallable; + private volatile ListenableFuture runShuffleFuture; + private final ListeningExecutorService executor; + + private final String srcNameTrimmed; + + private final List fetchers; + + private AtomicBoolean isShutDown = new AtomicBoolean(false); + private AtomicBoolean fetchersClosed = new AtomicBoolean(false); + private AtomicBoolean schedulerClosed = new AtomicBoolean(false); + private AtomicBoolean mergerClosed = new AtomicBoolean(false); public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs, long initialMemoryAvailable) throws IOException { this.inputContext = inputContext; this.conf = conf; - this.numInputs = numInputs; this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(), inputContext.getTaskVertexName(), inputContext.getTaskIndex(), this.conf, UserGroupInformation.getCurrentUser().getShortUserName()); + this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName()); + this.jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); @@ -149,7 +171,7 @@ public class Shuffle implements ExceptionReporter { scheduler = new ShuffleScheduler( this.inputContext, this.conf, - this.numInputs, + numInputs, this, shuffledInputsCounter, reduceShuffleBytes, @@ -174,24 +196,62 @@ public class Shuffle implements ExceptionReporter { codec, ifileReadAhead, ifileReadAheadLength); + + ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build()); + + int configuredNumFetchers = + conf.getInt( + TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, + TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES); + numFetchers = Math.min(configuredNumFetchers, numInputs); + LOG.info("Num fetchers being started: " + numFetchers); + fetchers = Lists.newArrayListWithCapacity(numFetchers); + + executor = MoreExecutors.listeningDecorator(rawExecutor); + runShuffleCallable = new RunShuffleCallable(); } public void handleEvents(List events) { - eventHandler.handleEvents(events); + if (!isShutDown.get()) { + eventHandler.handleEvents(events); + } else { + LOG.info("Ignoring events since already shutdown. EventCount: " + events.size()); + } + } /** * Indicates whether the Shuffle and Merge processing is complete. * @return false if not complete, true if complete or if an error occurred. + * @throws InterruptedException + * @throws IOException + * @throws InputAlreadyClosedException */ - public boolean isInputReady() { + // ZZZ Deal with these methods. + public boolean isInputReady() throws IOException, InterruptedException { + if (isShutDown.get()) { + throw new InputAlreadyClosedException(); + } + if (throwable != null) { + handleThrowable(throwable); + } if (runShuffleFuture == null) { return false; } - // TODO This may return true, followed by the reader throwing the actual Exception. - // Fix as part of TEZ-919. + // Don't need to check merge status, since runShuffleFuture will only + // complete once merge is complete. return runShuffleFuture.isDone(); - //return scheduler.isDone() && merger.isMergeComplete(); + } + + private void handleThrowable(Throwable t) throws IOException, InterruptedException { + if (t instanceof IOException) { + throw (IOException) t; + } else if (t instanceof InterruptedException) { + throw (InterruptedException) t; + } else { + throw new UndeclaredThrowableException(t); + } } /** @@ -200,56 +260,59 @@ public class Shuffle implements ExceptionReporter { * @throws IOException * @throws InterruptedException */ + // ZZZ Deal with these methods. public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException { Preconditions.checkState(runShuffleFuture != null, "waitForInput can only be called after run"); - TezRawKeyValueIterator kvIter; + TezRawKeyValueIterator kvIter = null; try { kvIter = runShuffleFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw (IOException) cause; - } else if (cause instanceof InterruptedException) { - throw (InterruptedException) cause; - } else { - throw new TezUncheckedException( - "Unexpected exception type while running Shuffle and Merge", cause); - } + handleThrowable(cause); + } + if (isShutDown.get()) { + throw new InputAlreadyClosedException(); + } + if (throwable != null) { + handleThrowable(throwable); } return kvIter; } public void run() throws IOException { merger.configureAndStart(); - RunShuffleCallable runShuffle = new RunShuffleCallable(); - runShuffleFuture = new FutureTask(runShuffle); - new Thread(runShuffleFuture, "ShuffleMergeRunner [" - + TezUtils.cleanVertexName(inputContext.getSourceVertexName() + "]")).start(); + runShuffleFuture = executor.submit(runShuffleCallable); + Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback()); + executor.shutdown(); } - + + public void shutdown() { + if (!isShutDown.getAndSet(true)) { + // Interrupt so that the scheduler / merger sees this interrupt. + LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed); + runShuffleFuture.cancel(true); + cleanupIgnoreErrors(); + } + } + private class RunShuffleCallable implements Callable { @Override public TezRawKeyValueIterator call() throws IOException, InterruptedException { - // TODO NEWTEZ Limit # fetchers to number of inputs - final int numFetchers = - conf.getInt( - TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, - TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES); - Fetcher[] fetchers = new Fetcher[numFetchers]; - for (int i = 0; i < numFetchers; ++i) { - fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, - Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength, - codec, inputContext); - - fetchers[i].start(); + + synchronized (this) { + for (int i = 0; i < numFetchers; ++i) { + Fetcher fetcher = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, + ifileReadAhead, ifileReadAheadLength, codec, inputContext); + fetchers.add(fetcher); + fetcher.start(); + } } + while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { synchronized (this) { if (throwable != null) { - inputContext.fatalError(throwable, "error in shuffle from thread :" - + throwingThreadName); throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } @@ -257,13 +320,10 @@ public class Shuffle implements ExceptionReporter { } // Stop the map-output fetcher threads - for (Fetcher fetcher : fetchers) { - fetcher.shutDown(); - } - fetchers = null; + cleanupFetchers(false); // stop the scheduler - scheduler.close(); + cleanupShuffleScheduler(false); // Finish the on-going merges... @@ -271,15 +331,12 @@ public class Shuffle implements ExceptionReporter { try { kvIter = merger.close(); } catch (Throwable e) { - inputContext.fatalError(e, "Error while doing final merge "); throw new ShuffleError("Error while doing final merge " , e); } // Sanity check synchronized (Shuffle.this) { if (throwable != null) { - inputContext.fatalError(throwable, "error in shuffle from thread :" - + throwingThreadName); throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } @@ -291,6 +348,73 @@ public class Shuffle implements ExceptionReporter { } } + private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException { + // Stop the fetcher threads + InterruptedException ie = null; + if (!fetchersClosed.getAndSet(true)) { + for (Fetcher fetcher : fetchers) { + try { + fetcher.shutDown(); + } catch (InterruptedException e) { + if (ignoreErrors) { + LOG.info("Interrupted while shutting down fetchers. Ignoring."); + } else { + if (ie != null) { + ie = e; + } else { + LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown " + + e); + } + } + } + } + fetchers.clear(); + // throw only the first exception while attempting to shutdown. + if (ie != null) { + throw ie; + } + } + } + + private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException { + + if (!schedulerClosed.getAndSet(true)) { + try { + scheduler.close(); + } catch (InterruptedException e) { + if (ignoreErrors) { + LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); + } else { + throw e; + } + } + } + } + + private void cleanupMerger(boolean ignoreErrors) throws Throwable { + if (!mergerClosed.getAndSet(true)) { + try { + merger.close(); + } catch (Throwable e) { + if (ignoreErrors) { + LOG.info("Exception while trying to shutdown merger, Ignoring", e); + } else { + throw e; + } + } + } + } + + private void cleanupIgnoreErrors() { + try { + cleanupFetchers(true); + cleanupShuffleScheduler(true); + cleanupMerger(true); + } catch (Throwable t) { + // Ignore + } + } + @Private public synchronized void reportException(Throwable t) { if (throwable == null) { @@ -316,4 +440,23 @@ public class Shuffle implements ExceptionReporter { public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory); } + + private class ShuffleRunnerFutureCallback implements FutureCallback { + @Override + public void onSuccess(TezRawKeyValueIterator result) { + LOG.info("Shuffle Runner thread complete"); + } + + @Override + public void onFailure(Throwable t) { + // ZZZ Handle failures during shutdown. + if (isShutDown.get()) { + LOG.info("Already shutdown. Ignoring error: ", t); + } else { + LOG.error("ShuffleRunner failed with error", t); + inputContext.fatalError(t, "Shuffle Runner Failed"); + cleanupIgnoreErrors(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java index 45a50ad..41ce489 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java @@ -570,6 +570,7 @@ class ShuffleScheduler { } public void close() throws InterruptedException { + /// ZZZ need to interrupt setlf ? referee.interrupt(); referee.join(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java new file mode 100644 index 0000000..ee41066 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.exceptions; + + +public class InputAlreadyClosedException extends RuntimeException { + + private static final long serialVersionUID = 5094990552896724803L; + + public InputAlreadyClosedException() { + super("Input already closed"); + } + + public InputAlreadyClosedException(Throwable cause) { + super("Input already closed", cause); + } + + public InputAlreadyClosedException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java index 0c01f92..7045b5c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java @@ -127,8 +127,10 @@ public class ShuffledMergedInput extends AbstractLogicalInput { * @return true if the input is ready for consumption, or if an error occurred * processing fetching the input. false if the shuffle and merge are * still in progress + * @throws InterruptedException + * @throws IOException */ - public synchronized boolean isInputReady() { + public synchronized boolean isInputReady() throws IOException, InterruptedException { Preconditions.checkState(isStarted.get(), "Must start input before invoking this method"); if (getNumPhysicalInputs() == 0) { return true; @@ -164,6 +166,9 @@ public class ShuffledMergedInput extends AbstractLogicalInput { if (this.getNumPhysicalInputs() != 0 && rawIter != null) { rawIter.close(); } + if (shuffle != null) { + shuffle.shutdown(); + } return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java index b2ac9dd..9ecf590 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLConnection; -import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -32,15 +30,13 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.crypto.SecretKey; -import javax.net.ssl.HttpsURLConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -76,6 +72,10 @@ public class Fetcher implements Callable { private final FetcherCallback fetcherCallback; private final FetchedInputAllocator inputManager; private final ApplicationId appId; + + private final String logIdentifier; + + private final AtomicBoolean isShutDown = new AtomicBoolean(false); private static boolean sslShuffle = false; private static SSLFactory sslFactory; @@ -94,12 +94,14 @@ public class Fetcher implements Callable { private LinkedHashSet remaining; private URL url; + private volatile HttpURLConnection connection; + private volatile DataInputStream input; private String encHash; private String msgToEncode; private Fetcher(FetcherCallback fetcherCallback, FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret, - Configuration conf) { + String srcNameTrimmed) { this.fetcherCallback = fetcherCallback; this.inputManager = inputManager; this.shuffleSecret = shuffleSecret; @@ -107,6 +109,7 @@ public class Fetcher implements Callable { this.pathToAttemptMap = new HashMap(); this.fetcherIdentifier = fetcherIdGen.getAndIncrement(); + this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier; // TODO NEWTEZ Ideally, move this out from here into a static initializer block. // Re-enable when ssl shuffle support is needed. @@ -141,40 +144,62 @@ public class Fetcher implements Callable { remaining = new LinkedHashSet(srcAttempts); - HttpURLConnection connection; try { - connection = connectToShuffleHandler(host, port, partition, srcAttempts); + connectToShuffleHandler(host, port, partition, srcAttempts); } catch (IOException e) { // ioErrs.increment(1); // If connect did not succeed, just mark all the maps as failed, // indirectly penalizing the host - for (Iterator leftIter = remaining.iterator(); leftIter - .hasNext();) { - fetcherCallback.fetchFailed(host, leftIter.next(), true); + if (isShutDown.get()) { + LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown"); + } else { + for (Iterator leftIter = remaining.iterator(); leftIter + .hasNext();) { + fetcherCallback.fetchFailed(host, leftIter.next(), true); + } } return new FetchResult(host, port, partition, remaining); } - - DataInputStream input; + if (isShutDown.get()) { + // shutdown would have no effect if in the process of establishing the connection. + shutdownInternal(); + LOG.info("Detected fetcher has been shutdown after connection establishment. Returning"); + return new FetchResult(host, port, partition, remaining); + } try { input = new DataInputStream(connection.getInputStream()); - validateConnectionResponse(connection, url, msgToEncode, encHash); + validateConnectionResponse(msgToEncode, encHash); } catch (IOException e) { // ioErrs.increment(1); // If we got a read error at this stage, it implies there was a problem // with the first map, typically lost map. So, penalize only that map // and add the rest - InputAttemptIdentifier firstAttempt = srcAttempts.get(0); - LOG.warn("Fetch Failure from host while connecting: " + host - + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e); - fetcherCallback.fetchFailed(host, firstAttempt, false); - return new FetchResult(host, port, partition, remaining); + if (isShutDown.get()) { + LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown"); + } else { + InputAttemptIdentifier firstAttempt = srcAttempts.get(0); + LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt + + " Informing ShuffleManager: ", e); + fetcherCallback.fetchFailed(host, firstAttempt, false); + return new FetchResult(host, port, partition, remaining); + } } // By this point, the connection is setup and the response has been // validated. + // Handle any shutdown which may have been invoked. + if (isShutDown.get()) { + // shutdown would have no effect if in the process of establishing the connection. + shutdownInternal(); + LOG.info("Detected fetcher has been shutdown after opening stream. Returning"); + return new FetchResult(host, port, partition, remaining); + } + // After this point, closing the stream and connection, should cause a + // SocketException, + // which will be ignored since shutdown has been invoked. + // Loop through available map-outputs and fetch them // On any error, faildTasks is not null and we exit // after putting back the remaining maps to the @@ -191,7 +216,7 @@ public class Fetcher implements Callable { } } - IOUtils.cleanup(LOG, input); + shutdown(); // Sanity check if (failedInputs == null && !remaining.isEmpty()) { @@ -203,6 +228,36 @@ public class Fetcher implements Callable { } + public void shutdown() { + if (!isShutDown.getAndSet(true)) { + shutdownInternal(); + } + } + + private void shutdownInternal() { + // Synchronizing on isShutDown to ensure we don't run into a parallel close + // Can't synchronize on the main class itself since that would cause the + // shutdown request to block + synchronized (isShutDown) { + try { + if (input != null) { + LOG.info("Closing input on " + logIdentifier); + input.close(); + } + if (connection != null) { + LOG.info("Closing connection on " + logIdentifier); + connection.disconnect(); + } + } catch (IOException e) { + LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : " + + e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + } + } + private InputAttemptIdentifier[] fetchInputs(DataInputStream input) { FetchedInput fetchedInput = null; InputAttemptIdentifier srcAttemptId = null; @@ -285,6 +340,8 @@ public class Fetcher implements Callable { // metrics.successFetch(); return null; } catch (IOException ioe) { + // ZZZ Add some shutdown code here + // ZZZ Make sure any assigned memory inputs are aborted // ioErrs.increment(1); if (srcAttemptId == null || fetchedInput == null) { LOG.info("fetcher" + " failed to read map header" + srcAttemptId @@ -360,11 +417,11 @@ public class Fetcher implements Callable { } } - private HttpURLConnection connectToShuffleHandler(String host, int port, + private void connectToShuffleHandler(String host, int port, int partition, List inputs) throws IOException { try { this.url = constructInputURL(host, port, partition, inputs); - HttpURLConnection connection = openConnection(url); + this.connection = openConnection(url); // generate hash of the url this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -382,8 +439,7 @@ public class Fetcher implements Callable { connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); - connect(connection, connectionTimeout); - return connection; + connect(connectionTimeout); } catch (IOException e) { LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size() + " inputs", e); @@ -391,8 +447,7 @@ public class Fetcher implements Callable { } } - private void validateConnectionResponse(HttpURLConnection connection, - URL url, String msgToEncode, String encHash) throws IOException { + private void validateConnectionResponse(String msgToEncode, String encHash) throws IOException { int rc = connection.getResponseCode(); if (rc != HttpURLConnection.HTTP_OK) { throw new IOException("Got invalid response code " + rc + " from " + url @@ -422,17 +477,8 @@ public class Fetcher implements Callable { } protected HttpURLConnection openConnection(URL url) throws IOException { - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - if (sslShuffle) { - HttpsURLConnection httpsConn = (HttpsURLConnection) conn; - try { - httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory()); - } catch (GeneralSecurityException ex) { - throw new IOException(ex); - } - httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); - } - return conn; + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + return connection; } /** @@ -440,8 +486,7 @@ public class Fetcher implements Callable { * only on the last failure. Instead of connecting with a timeout of X, we try * connecting with a timeout of x < X but multiple times. */ - private void connect(URLConnection connection, int connectionTimeout) - throws IOException { + private void connect(int connectionTimeout) throws IOException { int unit = 0; if (connectionTimeout < 0) { throw new IOException("Invalid timeout " + "[timeout = " @@ -456,6 +501,13 @@ public class Fetcher implements Callable { connection.connect(); break; } catch (IOException ioe) { + + // Check if already shutdown and abort subsequent connect attempts + if (isShutDown.get()) { + LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: [" + + ioe.getClass().getName() + ", " + ioe.getMessage() + "]"); + return; + } // update the total remaining connect-timeout connectionTimeout -= unit; @@ -503,9 +555,9 @@ public class Fetcher implements Callable { public FetcherBuilder(FetcherCallback fetcherCallback, FetchedInputAllocator inputManager, ApplicationId appId, - SecretKey shuffleSecret, Configuration conf) { + SecretKey shuffleSecret, String srcNameTrimmed) { this.fetcher = new Fetcher(fetcherCallback, inputManager, appId, - shuffleSecret, conf); + shuffleSecret, srcNameTrimmed); } public FetcherBuilder setCompressionParameters(CompressionCodec codec) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java index 050c0c0..657feed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java @@ -82,14 +82,12 @@ public class ShuffleManager implements FetcherCallback { private static final Log LOG = LogFactory.getLog(ShuffleManager.class); private final TezInputContext inputContext; - private final Configuration conf; private final int numInputs; private final FetchedInputAllocator inputManager; private final ListeningExecutorService fetcherExecutor; - private final ExecutorService schedulerRawExecutor; private final ListeningExecutorService schedulerExecutor; private final RunShuffleCallable schedulerCallable = new RunShuffleCallable(); @@ -99,6 +97,7 @@ public class ShuffleManager implements FetcherCallback { private final ConcurrentMap knownSrcHosts; private final BlockingQueue pendingHosts; private final Set obsoletedInputs; + private Set runningFetchers; private final AtomicInteger numCompletedInputs = new AtomicInteger(0); @@ -110,7 +109,6 @@ public class ShuffleManager implements FetcherCallback { private final Condition wakeLoop = lock.newCondition(); private final int numFetchers; - private final AtomicInteger numRunningFetchers = new AtomicInteger(0); // Parameters required by Fetchers private final SecretKey shuffleSecret; @@ -122,8 +120,8 @@ public class ShuffleManager implements FetcherCallback { private final boolean ifileReadAhead; private final int ifileReadAheadLength; - private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback(); - + private final String srcNameTrimmed; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final TezCounter shuffledInputsCounter; @@ -141,7 +139,6 @@ public class ShuffleManager implements FetcherCallback { int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength, CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException { this.inputContext = inputContext; - this.conf = conf; this.numInputs = numInputs; this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS); @@ -156,13 +153,16 @@ public class ShuffleManager implements FetcherCallback { this.ifileReadAheadLength = ifileReadAheadLength; this.codec = codec; this.inputManager = inputAllocator; + + this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName()); completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap(numInputs)); completedInputs = new LinkedBlockingQueue(numInputs); knownSrcHosts = new ConcurrentHashMap(); pendingHosts = new LinkedBlockingQueue(); obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap()); - + runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap()); + int maxConfiguredFetchers = conf.getInt( TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, @@ -172,20 +172,13 @@ public class ShuffleManager implements FetcherCallback { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool( numFetchers, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat( - "Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d") - .build()); + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); - this.schedulerRawExecutor = Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat( - "ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]") - .build()); + + ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build()); this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor); this.startTime = System.currentTimeMillis(); @@ -224,7 +217,7 @@ public class ShuffleManager implements FetcherCallback { while (!isShutdown.get() && numCompletedInputs.get() < numInputs) { lock.lock(); try { - if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) { + if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) { if (numCompletedInputs.get() < numInputs) { wakeLoop.await(); } @@ -242,12 +235,12 @@ public class ShuffleManager implements FetcherCallback { if (LOG.isDebugEnabled()) { LOG.debug("NumCompletedInputs: " + numCompletedInputs); } - if (numCompletedInputs.get() < numInputs) { + if (numCompletedInputs.get() < numInputs && !isShutdown.get()) { lock.lock(); try { - int maxFetchersToRun = numFetchers - numRunningFetchers.get(); + int maxFetchersToRun = numFetchers - runningFetchers.size(); int count = 0; - while (pendingHosts.peek() != null) { + while (pendingHosts.peek() != null && !isShutdown.get()) { InputHost inputHost = null; try { inputHost = pendingHosts.take(); @@ -262,16 +255,16 @@ public class ShuffleManager implements FetcherCallback { if (LOG.isDebugEnabled()) { LOG.debug("Processing pending host: " + inputHost.toDetailedString()); } - if (inputHost.getNumPendingInputs() > 0) { + if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) { LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier()); Fetcher fetcher = constructFetcherForHost(inputHost); - numRunningFetchers.incrementAndGet(); + runningFetchers.add(fetcher); if (isShutdown.get()) { LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); } ListenableFuture future = fetcherExecutor .submit(fetcher); - Futures.addCallback(future, fetchFutureCallback); + Futures.addCallback(future, new FetchFutureCallback(fetcher)); if (++count >= maxFetchersToRun) { break; } @@ -297,9 +290,8 @@ public class ShuffleManager implements FetcherCallback { } private Fetcher constructFetcherForHost(InputHost inputHost) { - FetcherBuilder fetcherBuilder = new FetcherBuilder( - ShuffleManager.this, inputManager, - inputContext.getApplicationId(), shuffleSecret, conf); + FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, inputManager, + inputContext.getApplicationId(), shuffleSecret, srcNameTrimmed); fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); @@ -517,15 +509,29 @@ public class ShuffleManager implements FetcherCallback { /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { - isShutdown.set(true); - if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) { - this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers - } - if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) { - this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers + if (!isShutdown.getAndSet(true)) { + // Shut down any pending fetchers + LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": " + + runningFetchers.size()); + lock.lock(); + try { + wakeLoop.signal(); // signal the fetch-scheduler + for (Fetcher fetcher : runningFetchers) { + fetcher.shutdown(); // This could be parallelized. + } + } finally { + lock.unlock(); + } + + if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) { + this.schedulerExecutor.shutdownNow(); + } + if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) { + this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers. + } } } - + private void registerCompletedInput(FetchedInput fetchedInput) { lock.lock(); try { @@ -653,10 +659,16 @@ public class ShuffleManager implements FetcherCallback { private class FetchFutureCallback implements FutureCallback { + private final Fetcher fetcher; + + public FetchFutureCallback(Fetcher fetcher) { + this.fetcher = fetcher; + } + private void doBookKeepingForFetcherComplete() { - numRunningFetchers.decrementAndGet(); lock.lock(); try { + runningFetchers.remove(fetcher); wakeLoop.signal(); } finally { lock.unlock(); @@ -665,20 +677,27 @@ public class ShuffleManager implements FetcherCallback { @Override public void onSuccess(FetchResult result) { - Iterable pendingInputs = result.getPendingInputs(); - if (pendingInputs != null && pendingInputs.iterator().hasNext()) { - InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort())); - assert inputHost != null; - for (InputAttemptIdentifier input : pendingInputs) { - inputHost.addKnownInput(input); + fetcher.shutdown(); + if (isShutdown.get()) { + LOG.info("Already shutdown. Ignoring event from fetcher"); + } else { + Iterable pendingInputs = result.getPendingInputs(); + if (pendingInputs != null && pendingInputs.iterator().hasNext()) { + InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort())); + assert inputHost != null; + for (InputAttemptIdentifier input : pendingInputs) { + inputHost.addKnownInput(input); + } + pendingHosts.add(inputHost); } - pendingHosts.add(inputHost); + doBookKeepingForFetcherComplete(); } - doBookKeepingForFetcherComplete(); } @Override public void onFailure(Throwable t) { + // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down. + fetcher.shutdown(); if (isShutdown.get()) { LOG.info("Already shutdown. Ignoring error from fetcher: " + t); } else {