Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0CD8518892 for ; Thu, 6 Aug 2015 09:25:58 +0000 (UTC) Received: (qmail 54675 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 54574 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 52990 invoked by uid 99); 6 Aug 2015 09:25:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Aug 2015 09:25:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED960E6838; Thu, 6 Aug 2015 09:25:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 06 Aug 2015 09:26:18 -0000 Message-Id: <13d62cee2f644552874d2b8486a0ab1c@git.apache.org> In-Reply-To: <10a0859464864279b3a9f48415e4a976@git.apache.org> References: <10a0859464864279b3a9f48415e4a976@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/51] [abbrv] tez git commit: TEZ-2434. Allow tasks to be killed in the runtime. (sseth) TEZ-2434. Allow tasks to be killed in the runtime. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b51e271e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b51e271e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b51e271e Branch: refs/heads/TEZ-2003 Commit: b51e271ef2e039dd2911b1ba0fcf982a7c7954da Parents: d8fb6ad Author: Siddharth Seth Authored: Mon May 11 23:34:43 2015 -0700 Committer: Siddharth Seth Committed: Thu Aug 6 01:26:09 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/runtime/task/EndReason.java | 29 ++ .../tez/runtime/task/TaskRunner2Callable.java | 132 ++++++ .../tez/runtime/task/TaskRunner2Result.java | 48 ++ .../org/apache/tez/runtime/task/TezChild.java | 20 +- .../apache/tez/runtime/task/TezTaskRunner.java | 1 + .../apache/tez/runtime/task/TezTaskRunner2.java | 434 +++++++++++++++++++ 7 files changed, 655 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 66c110f..5d2e40a 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -24,5 +24,6 @@ ALL CHANGES: TEZ-2420. TaskRunner returning before executing the task. TEZ-2433. Fixes after rebase 05/08 TEZ-2438. tez-tools version in the branch is incorrect. + TEZ-2434. Allow tasks to be killed in the Runtime. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java new file mode 100644 index 0000000..8dc7a87 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +public enum EndReason { + SUCCESS(false), + CONTAINER_STOP_REQUESTED(false), + KILL_REQUESTED(true), + COMMUNICATION_FAILURE(false), + TASK_ERROR(false); + + private final boolean isActionable; + + EndReason(boolean isActionable) { + this.isActionable = isActionable; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java new file mode 100644 index 0000000..7315bbd --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.common.CallableWithNdc; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}. + * It does not worry about reporting errors, heartbeats etc. + * + * Returns success / interrupt / failure status via it's return parameter. + * + * It's the responsibility of the invoker to handle whatever exceptions may be generated by this. + */ +public class TaskRunner2Callable extends CallableWithNdc { + + + + private static final Logger LOG = LoggerFactory.getLogger(TaskRunner2Callable.class); + + private final LogicalIOProcessorRuntimeTask task; + private final UserGroupInformation ugi; + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + + private volatile Thread ownThread; + + public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task, + UserGroupInformation ugi) { + this.task = task; + this.ugi = ugi; + } + + @Override + public TaskRunner2CallableResult callInternal() throws Exception { + ownThread = Thread.currentThread(); + if (stopRequested.get()) { + return new TaskRunner2CallableResult(null); + } + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TaskRunner2CallableResult run() throws Exception { + if (stopRequested.get() || Thread.currentThread().isInterrupted()) { + return new TaskRunner2CallableResult(null); + } + LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID()); + task.initialize(); + + if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { + LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID()); + task.run(); + } else { + LOG.info("Stopped before running the processor."); + return new TaskRunner2CallableResult(null); + } + + if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { + LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID()); + task.close(); + task.setFrameworkCounters(); + } else { + LOG.info("Stopped before closing the processor"); + return new TaskRunner2CallableResult(null); + } + LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get()); + + + return new TaskRunner2CallableResult(null); + } + }); + } catch (Throwable t) { + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + return new TaskRunner2CallableResult(t); + } finally { + // If a stop was requested. Make sure the interrupt status is set during the cleanup. + + // One drawback of not communicating out from here is that task complete messages will only + // be sent out after cleanup is complete. + // For a successful task, however, this should be almost no delay since close has already happened. + maybeFixInterruptStatus(); + LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); + task.cleanup(); + } + } + + private void maybeFixInterruptStatus() { + if (stopRequested.get() && !Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + } + } + + + public void interruptTask() { + // Ensure the task is only interrupted once. + if (!stopRequested.getAndSet(true)) { + if (ownThread != null) { + ownThread.interrupt(); + } + } + } + + public static class TaskRunner2CallableResult { + final Throwable error; + + public TaskRunner2CallableResult(Throwable error) { + this.error = error; + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java new file mode 100644 index 0000000..07b32ce --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +public class TaskRunner2Result { + final EndReason endReason; + final Throwable error; + final boolean containerShutdownRequested; + + public TaskRunner2Result(EndReason endReason, Throwable error, boolean containerShutdownRequested) { + this.endReason = endReason; + this.error = error; + this.containerShutdownRequested = containerShutdownRequested; + } + + public EndReason getEndReason() { + return endReason; + } + + public Throwable getError() { + return error; + } + + public boolean isContainerShutdownRequested() { + return containerShutdownRequested; + } + + @Override + public String toString() { + return "TaskRunner2Result{" + + "endReason=" + endReason + + ", error=" + error + + ", containerShutdownRequested=" + containerShutdownRequested + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 4c8bebc..fff39a0 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -248,27 +248,27 @@ public class TezChild { cleanupOnTaskChanged(containerTask); // Execute the Actual Task - TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI, + TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI, localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memAvailable); boolean shouldDie; try { - shouldDie = !taskRunner.run(); + TaskRunner2Result result = taskRunner.run(); + shouldDie = result.isContainerShutdownRequested(); if (shouldDie) { LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString); shutdown(); return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, "Asked to die by the AM"); } - } catch (IOException e) { - handleError(e); - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); - } catch (TezException e) { - handleError(e); - return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); + if (result.getError() != null) { + Throwable e = result.getError(); + handleError(result.getError()); + return new ContainerExecutionResult( + ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, + e, "TaskExecutionFailure: " + e.getMessage()); + } } finally { FileSystem.closeAllForUGI(childUGI); } http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index dd4620a..a82d87b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -106,6 +106,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { if (!Thread.currentThread().isInterrupted()) { taskFuture = executor.submit(callable); } else { + taskReporter.unregisterTask(task.getTaskAttemptID()); return isShutdownRequested(); } try { http://git-wip-us.apache.org/repos/asf/tez/blob/b51e271e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java new file mode 100644 index 0000000..73e5c76 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -0,0 +1,434 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TezTaskRunner2 { + + + private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class); + + private final LogicalIOProcessorRuntimeTask task; + private final UserGroupInformation ugi; + + private final TaskReporterInterface taskReporter; + private final ListeningExecutorService executor; + private final UmbilicalAndErrorHandler umbilicalAndErrorHandler; + + // TODO It may be easier to model this as a state machine. + + // Indicates whether a kill has been requested. + private final AtomicBoolean killTaskRequested = new AtomicBoolean(false); + + // Indicates whether a stop container has been requested. + private final AtomicBoolean stopContainerRequested = new AtomicBoolean(false); + + // Indicates whether the task is complete. + private final AtomicBoolean taskComplete = new AtomicBoolean(false); + + // Separate flag from firstException, since an error can be reported without an exception. + private final AtomicBoolean errorSeen = new AtomicBoolean(false); + + private volatile EndReason firstEndReason = null; + + // The first exception which caused the task to fail. This could come in from the + // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context. + private volatile Throwable firstException; + private volatile EventMetaData exceptionSourceInfo; + private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false); + + private boolean oobSignalErrorInProgress = false; + private final Lock oobSignalLock = new ReentrantLock(); + private final Condition oobSignalCondition = oobSignalLock.newCondition(); + + private volatile long taskKillStartTime = 0; + + // The callable which is being used to execute the task. + private volatile TaskRunner2Callable taskRunnerCallable; + + public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, + TaskSpec taskSpec, int appAttemptNumber, + Map serviceConsumerMetadata, + Map serviceProviderEnvMap, + Multimap startedInputsMap, + TaskReporterInterface taskReporter, ListeningExecutorService executor, + ObjectRegistry objectRegistry, String pid, + ExecutionContext executionContext, long memAvailable) throws + IOException { + this.ugi = ugi; + this.taskReporter = taskReporter; + this.executor = executor; + this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler(); + this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, + umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, + objectRegistry, pid, executionContext, memAvailable); + } + + /** + * Throws an exception only when there was a communication error reported by + * the TaskReporter. + * + * Otherwise, this takes care of all communication with the AM for a a running task - which + * includes informing the AM about Failures and Success. + * + * If a kill request is made to the task, it will not communicate this information to + * the AM - since a task KILL is an external event, and whoever invoked it should + * be able to track it. + * + * @return + */ + public TaskRunner2Result run() { + try { + ListenableFuture future = null; + synchronized (this) { + if (isRunningState()) { + // Safe to do this within a synchronized block because we're providing + // the handler on which the Reporter will communicate back. Assuming + // the register call doesn't end up hanging. + taskRunnerCallable = new TaskRunner2Callable(task, ugi); + taskReporter.registerTask(task, umbilicalAndErrorHandler); + future = executor.submit(taskRunnerCallable); + } + } + + if (future == null) { + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + } + + TaskRunner2CallableResult executionResult = null; + // The task started. Wait for it to complete. + try { + executionResult = future.get(); + } catch (Throwable e) { + if (e instanceof ExecutionException) { + e = e.getCause(); + } + synchronized (this) { + if (isRunningState()) { + trySettingEndReason(EndReason.TASK_ERROR); + registerFirstException(e, null); + LOG.warn("Exception from RunnerCallable", e); + } + } + } + if (executionResult != null) { + synchronized (this) { + if (isRunningState()) { + if (executionResult.error != null) { + trySettingEndReason(EndReason.TASK_ERROR); + registerFirstException(executionResult.error, null); + } else { + trySettingEndReason(EndReason.SUCCESS); + taskComplete.set(true); + } + } + } + } + + switch (firstEndReason) { + case SUCCESS: + try { + taskReporter.taskSucceeded(task.getTaskAttemptID()); + return logAndReturnEndResult(EndReason.SUCCESS, null, stopContainerRequested.get()); + } catch (IOException e) { + // Comm failure. Task can't do much. + handleFinalStatusUpdateFailure(e, true); + return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get()); + } catch (TezException e) { + // Failure from AM. Task can't do much. + handleFinalStatusUpdateFailure(e, true); + return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get()); + } + case CONTAINER_STOP_REQUESTED: + // Don't need to send any more communication updates to the AM. + return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get()); + case KILL_REQUESTED: + // Kill is currently not reported to the AM via the TaskRunner. Fix this when the umbilical + // supports an indication of kill, if required. + return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get()); + case COMMUNICATION_FAILURE: + // Already seen a communication failure. There's no point trying to report another one. + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + case TASK_ERROR: + // Don't report an error again if it was reported via signalFatalError + if (errorReporterToAm.get()) { + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + } else { + String message; + if (firstException instanceof FSError) { + message = "Encountered an FSError while executing task: " + task.getTaskAttemptID(); + } else if (firstException instanceof Error) { + message = "Encountered an Error while executing task: " + task.getTaskAttemptID(); + } else { + message = "Failure while running task: " + task.getTaskAttemptID(); + } + try { + taskReporter.taskFailed(task.getTaskAttemptID(), firstException, message, exceptionSourceInfo); + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + } catch (IOException e) { + // Comm failure. Task can't do much. + handleFinalStatusUpdateFailure(e, true); + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + } catch (TezException e) { + // Failure from AM. Task can't do much. + handleFinalStatusUpdateFailure(e, true); + return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + } + } + default: + LOG.error("Unexpected EndReason. File a bug"); + return logAndReturnEndResult(EndReason.TASK_ERROR, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get()); + + } + } finally { + // Clear the interrupted status of the blocking thread, in case it is set after the + // InterruptedException was invoked. + oobSignalLock.lock(); + try { + while (oobSignalErrorInProgress) { + try { + oobSignalCondition.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for OOB fatal error to complete"); + Thread.currentThread().interrupt(); + } + } + } finally { + oobSignalLock.unlock(); + } + taskReporter.unregisterTask(task.getTaskAttemptID()); + if (taskKillStartTime != 0) { + LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime)); + } + Thread.interrupted(); + } + } + + public void killTask() { + synchronized (this) { + if (isRunningState()) { + trySettingEndReason(EndReason.KILL_REQUESTED); + if (taskRunnerCallable != null) { + taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.interruptTask(); + } + } + } + } + + + // Checks and changes on these states should happen within a synchronized block, + // to ensure the first event is the one that is captured and causes specific behaviour. + private boolean isRunningState() { + return !taskComplete.get() && !killTaskRequested.get() && !stopContainerRequested.get() && + !errorSeen.get(); + } + + class UmbilicalAndErrorHandler implements TezUmbilical, ErrorReporter { + + @Override + public void addEvents(Collection events) { + // Incoming events from the running task. + // Only add these if the task is running. + if (isRunningState()) { + taskReporter.addEvents(task.getTaskAttemptID(), events); + } + } + + @Override + public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message, + EventMetaData sourceInfo) { + // Fatal error reported by the task. + boolean isFirstError = false; + synchronized (TezTaskRunner2.this) { + if (isRunningState()) { + if (trySettingEndReason(EndReason.TASK_ERROR)) { + if (t == null) { + t = new RuntimeException( + message == null ? "FatalError: No user message or exception specified" : message); + } + registerFirstException(t, sourceInfo); + LOG.info("Received notification of a fatal error which will cause the task to die", t); + isFirstError = true; + errorReporterToAm.set(true); + oobSignalErrorInProgress = true; + } else { + LOG.info( + "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ", + firstEndReason, (t == null ? message : t.getMessage())); + } + } + } + + // Informing the TaskReporter here because the running task may not be interruptable. + // Has to be outside the lock. + if (isFirstError) { + killTask(); + try { + taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo); + } catch (IOException e) { + // Comm failure. Task can't do much. The main exception is already registered. + handleFinalStatusUpdateFailure(e, true); + } catch (TezException e) { + // Failure from AM. Task can't do much. The main exception is already registered. + handleFinalStatusUpdateFailure(e, true); + } finally { + oobSignalLock.lock(); + try { + // This message is being sent outside of the main thread, which may end up completing before + // this thread runs. Make sure the main run thread does not end till this completes. + oobSignalErrorInProgress = false; + oobSignalCondition.signal(); + } finally { + oobSignalLock.unlock(); + } + } + } + } + + @Override + public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { + // Task checking whether it can commit. + + // Not getting a lock here. It should be alright for the to check with the reporter + // on whether a task can commit. + if (isRunningState()) { + return taskReporter.canCommit(taskAttemptID); + // If there's a communication failure here, let it propagate through to the task. + // which may throw it back or handle it appropriately. + } else { + // Don't throw an error since the task is already in the process of shutting down. + LOG.info("returning canCommit=false since task is not in a running state"); + return false; + } + } + + + @Override + public void reportError(Throwable t) { + // Umbilical reporting an error during heartbeat + boolean isFirstError = false; + synchronized (TezTaskRunner2.this) { + if (isRunningState()) { + LOG.info("TaskReporter reporter error which will cause the task to fail", t); + if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) { + registerFirstException(t, null); + isFirstError = true; + } + // A race is possible between a task succeeding, and a subsequent timed heartbeat failing. + // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded + // method does not throw an exception, in which case task success is registered with the AM. + // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter + } else { + LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID() + + " is already complete, is failing or has been asked to terminate"); + } + } + // Since this error came from the taskReporter - there's no point attempting to report a failure back to it. + if (isFirstError) { + killTask(); + } + } + + @Override + public void shutdownRequested() { + // Umbilical informing about a shutdown request for the container. + boolean isFirstTerminate = false; + synchronized (TezTaskRunner2.this) { + isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED); + // Respect stopContainerRequested since it can come in at any point, despite a previous failure. + stopContainerRequested.set(true); + } + + if (isFirstTerminate) { + killTask(); + } + } + } + + private synchronized boolean trySettingEndReason(EndReason endReason) { + if (isRunningState()) { + firstEndReason = endReason; + return true; + } + return false; + } + + + private void registerFirstException(Throwable t, EventMetaData sourceInfo) { + Preconditions.checkState(isRunningState()); + errorSeen.set(true); + firstException = t; + this.exceptionSourceInfo = sourceInfo; + } + + + private String getTaskDiagnosticsString(Throwable t, String message) { + String diagnostics; + if (t != null && message != null) { + diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage=" + + message; + } else if (t == null && message == null) { + diagnostics = "Unknown error"; + } else { + diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + : " errorMessage=" + message; + } + return diagnostics; + } + + private TaskRunner2Result logAndReturnEndResult(EndReason endReason, Throwable firstError, + boolean stopContainerRequested) { + TaskRunner2Result result = new TaskRunner2Result(endReason, firstError, stopContainerRequested); + LOG.info("TaskRunnerResult for {} : {} ", task.getTaskAttemptID(), result); + return result; + } + + private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) { + // TODO Ideally differentiate between FAILED/KILLED + LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t); + } +} \ No newline at end of file