Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3767517FFE for ; Fri, 16 Jan 2015 00:51:15 +0000 (UTC) Received: (qmail 7340 invoked by uid 500); 16 Jan 2015 00:51:17 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 7305 invoked by uid 500); 16 Jan 2015 00:51:17 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 7296 invoked by uid 99); 16 Jan 2015 00:51:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jan 2015 00:51:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C6A7E07D6; Fri, 16 Jan 2015 00:51:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <3d7036cf8f58479ebe7a6ebb940ae3e5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-1962. Fix a thread leak in LocalMode. (sseth) Date: Fri, 16 Jan 2015 00:51:15 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 0e872a5dc -> 8bf99a8df TEZ-1962. Fix a thread leak in LocalMode. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8bf99a8d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8bf99a8d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8bf99a8d Branch: refs/heads/branch-0.6 Commit: 8bf99a8df80443ddc36916d21bea2b2c3c1e1013 Parents: 0e872a5 Author: Siddharth Seth Authored: Thu Jan 15 16:50:49 2015 -0800 Committer: Siddharth Seth Committed: Thu Jan 15 16:50:49 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/launcher/LocalContainerLauncher.java | 72 +++++++++++++------- .../org/apache/tez/runtime/task/TezChild.java | 34 +++++---- 3 files changed, 71 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8bf99a8d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6bdc3dc..452804f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -93,6 +93,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-1962. Fix a thread leak in LocalMode. TEZ-1924. Tez AM does not register with AM with full FQDN causing jobs to fail in some environments. TEZ-1878. Task-specific log level override not working in certain conditions. http://git-wip-us.apache.org/repos/asf/tez/blob/8bf99a8d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index f14fd5d..bd4996b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -182,6 +182,17 @@ public class LocalContainerLauncher extends AbstractService implements context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message)); } + private void handleLaunchFailed(Throwable t, ContainerId containerId) { + String message; + if (t instanceof RejectedExecutionException) { + message = "Failed to queue container launch for container Id: " + containerId; + } else { + message = "Failed to launch container for container Id: " + containerId; + } + LOG.error(message, t); + sendContainerLaunchFailedMsg(containerId, message); + } + //launch tasks private void launch(NMCommunicatorLaunchRequestEvent event) { @@ -191,19 +202,29 @@ public class LocalContainerLauncher extends AbstractService implements TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); try { + TezChild tezChild; + try { + tezChild = + createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, + context.getApplicationAttemptId().getAttemptId(), localDirs, + (TezTaskUmbilicalProtocol) taskAttemptListener); + } catch (InterruptedException e) { + handleLaunchFailed(e, event.getContainerId()); + return; + } catch (TezException e) { + handleLaunchFailed(e, event.getContainerId()); + return; + } catch (IOException e) { + handleLaunchFailed(e, event.getContainerId()); + return; + } ListenableFuture runningTaskFuture = - taskExecutorService.submit(createSubTask(context.getAMConf(), - event.getContainerId(), tokenIdentifier, - context.getApplicationAttemptId().getAttemptId(), - localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener)); + taskExecutorService.submit(createSubTask(tezChild, event.getContainerId())); runningContainers.put(event.getContainerId(), runningTaskFuture); - Futures - .addCallback(runningTaskFuture, new RunningTaskCallback(context, event.getContainerId()), - callbackExecutor); + Futures.addCallback(runningTaskFuture, + new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor); } catch (RejectedExecutionException e) { - String message = "Failed to queue container launch for container Id: " + event.getContainerId(); - LOG.error(message, e); - sendContainerLaunchFailedMsg(event.getContainerId(), message); + handleLaunchFailed(e, event.getContainerId()); } } @@ -228,10 +249,12 @@ public class LocalContainerLauncher extends AbstractService implements private final AppContext appContext; private final ContainerId containerId; + private final TezChild tezChild; - RunningTaskCallback(AppContext appContext, ContainerId containerId) { + RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild) { this.appContext = appContext; this.containerId = containerId; + this.tezChild = tezChild; } @Override @@ -257,6 +280,7 @@ public class LocalContainerLauncher extends AbstractService implements @Override public void onFailure(Throwable t) { runningContainers.remove(containerId); + tezChild.shutdown(); // Ignore CancellationException since that is triggered by the LocalContainerLauncher itself if (!(t instanceof CancellationException)) { LOG.info("Container: " + containerId + ": Execution Failed: ", t); @@ -278,12 +302,7 @@ public class LocalContainerLauncher extends AbstractService implements //create a SubTask private synchronized Callable createSubTask( - final Configuration defaultConf, - final ContainerId containerId, - final String tokenIdentifier, - final int attemptNumber, - final String[] localDirs, - final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) { + final TezChild tezChild, final ContainerId containerId) { return new Callable() { @Override @@ -296,16 +315,23 @@ public class LocalContainerLauncher extends AbstractService implements context.getApplicationAttemptId()); context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt)); - // Pull in configuration specified for the session. - TezChild tezChild = - TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, - attemptNumber, localDirs, workingDirectory); - tezChild.setUmbilical(tezTaskUmbilicalProtocol); return tezChild.run(); } }; } + private TezChild createTezChild(Configuration defaultConf, ContainerId containerId, + String tokenIdentifier, int attemptNumber, String[] localDirs, + TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws + InterruptedException, TezException, IOException { + + TezChild tezChild = + TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, + attemptNumber, localDirs, workingDirectory); + tezChild.setUmbilical(tezTaskUmbilicalProtocol); + return tezChild; + } + @Override public void handle(NMCommunicatorEvent event) { try { @@ -314,4 +340,4 @@ public class LocalContainerLauncher extends AbstractService implements throw new TezUncheckedException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tez/blob/8bf99a8d/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java index 05daf5a..3631ca5 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -95,6 +96,7 @@ public class TezChild { private final ListeningExecutorService executor; private final ObjectRegistryImpl objectRegistry; private final Map serviceConsumerMetadata = new HashMap(); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private Multimap startedInputsMap = HashMultimap.create(); @@ -179,20 +181,24 @@ public class TezChild { TezUtilsInternal.updateLoggers(""); } ListenableFuture getTaskFuture = executor.submit(containerReporter); + boolean error = false; ContainerTask containerTask = null; try { containerTask = getTaskFuture.get(); } catch (ExecutionException e) { + error = true; Throwable cause = e.getCause(); - handleError(cause); return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, cause, "Execution Exception while fetching new work: " + e.getMessage()); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for new work:" - + containerTask.getTaskSpec().getTaskAttemptID()); - handleError(e); + error = true; + LOG.info("Interrupted while waiting for new work"); return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e, "Interrupted while waiting for new work"); + } finally { + if (error) { + shutdown(); + } } if (containerTask.shouldDie()) { LOG.info("ContainerTask returned shouldDie=true, Exiting"); @@ -323,15 +329,17 @@ public class TezChild { lastVertexID = newVertexID; } - private void shutdown() { - executor.shutdownNow(); - if (taskReporter != null) { - taskReporter.shutdown(); - } - RPC.stopProxy(umbilical); - DefaultMetricsSystem.shutdown(); - if (!isLocal) { - LogManager.shutdown(); + public void shutdown() { + if (!isShutdown.getAndSet(true)) { + executor.shutdownNow(); + if (taskReporter != null) { + taskReporter.shutdown(); + } + DefaultMetricsSystem.shutdown(); + if (!isLocal) { + RPC.stopProxy(umbilical); + LogManager.shutdown(); + } } }