Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E74621846C for ; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Received: (qmail 98896 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 98808 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 98623 invoked by uid 99); 22 Aug 2015 07:26:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AB69E060C; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 07:26:07 -0000 Message-Id: In-Reply-To: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> References: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] tez git commit: TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan. TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. Contribtued by Rajesh Balamohan. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6dabd8e9 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6dabd8e9 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6dabd8e9 Branch: refs/heads/master Commit: 6dabd8e992c1ace91b3612d39426f3fbd2d8fe2f Parents: 8ba7e44 Author: Siddharth Seth Authored: Wed May 6 00:39:46 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:13:54 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 83 ++++++++++++++++++-- .../org/apache/tez/runtime/RuntimeTask.java | 5 ++ .../apache/tez/runtime/task/TezTaskRunner.java | 71 ++++++++++++++++- 4 files changed, 152 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6dabd8e9/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9fc9ed3..f8a71e8 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -20,5 +20,6 @@ ALL CHANGES: TEZ-2361. Propagate dag completion to TaskCommunicator. TEZ-2381. Fixes after rebase 04/28. TEZ-2388. Send dag identifier as part of the fetcher request string. + TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/6dabd8e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 84e5e0d..8263b3f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -20,6 +20,9 @@ package org.apache.tez.runtime; import java.io.Closeable; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -41,6 +44,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; @@ -174,6 +178,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.runInputMap = new LinkedHashMap(); this.runOutputMap = new LinkedHashMap(); + this.initializedInputs = new ConcurrentHashMap(); + this.initializedOutputs = new ConcurrentHashMap(); + this.processorDescriptor = taskSpec.getProcessorDescriptor(); this.serviceConsumerMetadata = serviceConsumerMetadata; this.envMap = envMap; @@ -420,6 +427,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { taskSpec.getTaskAttemptID()); initializedInputs.put(edgeName, input); LOG.info("Initialized Input with src edge: " + edgeName); + initializedInputs.put(edgeName, input); return null; } } @@ -469,6 +477,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); initializedOutputs.put(edgeName, output); LOG.info("Initialized Output with dest edge: " + edgeName); + initializedOutputs.put(edgeName, output); return null; } } @@ -694,6 +703,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventsToBeProcessed.addAll(events); } + @Override + public synchronized void abortTask() throws Exception { + if (processor != null) { + processor.abort(); + } + } + private void startRouterThread() { eventRouterThread = new Thread(new RunnableWithNdc() { public void runInternal() { @@ -713,6 +729,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { if (!isTaskDone()) { LOG.warn("Event Router thread interrupted. Returning."); } + Thread.currentThread().interrupt(); return; } } @@ -724,6 +741,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.start(); } + private void maybeResetInterruptStatus() { + if (!Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + } + } + private void closeContexts() throws IOException { closeContext(inputContextMap); closeContext(outputContextMap); @@ -763,6 +786,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } // Close the unclosed IPO + /** + * Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they + * would not be available in the IPOs to be cleaned. So this is safe. + * + * e.g whenever input gets closed() in normal way, it automatically removes it from + * initializedInputs map. + * + * In case any exception happens in processor close or IO close, it wouldn't be removed from + * the initialized IO data structures and here is the chance to close them and release + * resources. + * + */ if (LOG.isDebugEnabled()) { LOG.debug("Processor closed={}", processorClosed); LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); @@ -773,10 +808,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { try { processorClosed = true; processor.close(); - LOG.info("Closed processor for vertex={}, index={}", + LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", processor .getContext().getTaskVertexName(), - processor.getContext().getTaskVertexIndex()); + processor.getContext().getTaskVertexIndex(), + Thread.currentThread().isInterrupted()); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt for processor"); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}", @@ -792,13 +833,19 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputIterator.remove(); try { ((InputFrameworkInterface)entry.getValue()).close(); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for input with srcVertexName={}", + srcVertexName); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", srcVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor - .getContext().getTaskVertexName(), srcVertexName); + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted()); } } @@ -810,16 +857,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputIterator.remove(); try { ((OutputFrameworkInterface) entry.getValue()).close(); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt status for output with destVertexName={}", + destVertexName); + Thread.currentThread().interrupt(); } catch (Throwable e) { LOG.warn( "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", destVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor - .getContext().getTaskVertexName(), destVertexName); + LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor + .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted()); } } + if (LOG.isDebugEnabled()) { + printThreads(); + } + try { closeContexts(); // Cleanup references which may be held by misbehaved tasks. @@ -867,6 +924,20 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputReadyTracker = null; objectRegistry = null; } + + + /** + * Print all threads in JVM (only for debugging) + */ + void printThreads() { + //Print the status of all threads in JVM + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadMXBean.getAllThreadIds(); + for (Long id : threadIds) { + ThreadInfo threadInfo = threadMXBean.getThreadInfo(id); + LOG.info("ThreadId : " + id + ", name=" + threadInfo.getThreadName()); + } + } @Private @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/6dabd8e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 17d7053..cdfb46a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -76,6 +76,10 @@ public abstract class RuntimeTask { protected final AtomicReference state = new AtomicReference(); + public boolean isRunning() { + return (state.get() == State.RUNNING); + } + public TezCounters addAndGetTezCounter(String name) { TezCounters counter = new TezCounters(); counterMap.put(name, counter); @@ -163,4 +167,5 @@ public abstract class RuntimeTask { taskDone.set(true); } + public abstract void abortTask() throws Exception; } http://git-wip-us.apache.org/repos/asf/tez/blob/6dabd8e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index 33a7f4a..7238d5e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -25,8 +25,13 @@ import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Throwables; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; @@ -35,6 +40,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -61,6 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final ListeningExecutorService executor; private volatile ListenableFuture taskFuture; private volatile Thread waitingThread; + private volatile Thread taskRunner; private volatile Throwable firstException; // Effectively a duplicate check, since hadFatalError does the same thing. @@ -96,7 +103,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskReporter.registerTask(task, this); TaskRunnerCallable callable = new TaskRunnerCallable(); Throwable failureCause = null; - taskFuture = executor.submit(callable); + if (!Thread.currentThread().isInterrupted()) { + taskFuture = executor.submit(callable); + return isShutdownRequested(); + } try { taskFuture.get(); @@ -158,6 +168,10 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } } + return isShutdownRequested(); + } + + private boolean isShutdownRequested() { if (shutdownRequested.get()) { LOG.info("Shutdown requested... returning"); return false; @@ -173,11 +187,14 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { @Override public Void run() throws Exception { try { + taskRunner = Thread.currentThread(); LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); task.initialize(); if (!Thread.currentThread().isInterrupted() && firstException == null) { LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); task.run(); + maybeInterruptWaitingThread(); + LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); task.close(); task.setFrameworkCounters(); @@ -199,6 +216,12 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } return null; } catch (Throwable cause) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("TaskRunnerCallable interrupted=" + Thread.currentThread().isInterrupted() + + ", shutdownRequest=" + shutdownRequested.get()); + Thread.currentThread().interrupt(); + return null; + } if (cause instanceof FSError) { // Not immediately fatal, this is an error reported by Hadoop FileSystem maybeRegisterFirstException(cause); @@ -255,6 +278,17 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { taskRunning.set(false); } } + + private void maybeInterruptWaitingThread() { + /** + * Possible that the processor is swallowing InterruptException of taskRunner.interrupt(). + * In such case, interrupt the waitingThread based on the shutdownRequested flag, so that + * entire task gets cancelled. + */ + if (shutdownRequested.get()) { + waitingThread.interrupt(); + } + } } // should wait until all messages are sent to AM before TezChild shutdown @@ -353,10 +387,43 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { } } + private void abortRunningTask() { + if (!taskRunning.get()) { + LOG.info("Task is not running"); + waitingThread.interrupt(); + return; + } + + if (taskRunning.get()) { + try { + task.abortTask(); + } catch (Exception e) { + LOG.warn("Error when aborting the task", e); + try { + sendFailure(e, "Error when aborting the task"); + } catch (Exception ignored) { + // Ignored. + } + } + } + //Interrupt the relevant threads. TaskRunner should be interrupted preferably. + if (isTaskRunning()) { + LOG.info("Interrupting taskRunner=" + taskRunner.getName()); + taskRunner.interrupt(); + } else { + LOG.info("Interrupting waitingThread=" + waitingThread.getName()); + waitingThread.interrupt(); + } + } + + private boolean isTaskRunning() { + return (taskRunning.get() && task.isRunning()); + } + @Override public void shutdownRequested() { shutdownRequested.set(true); - waitingThread.interrupt(); + abortRunningTask(); } private String getTaskDiagnosticsString(Throwable t, String message) {