Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 12CA318957 for ; Fri, 21 Aug 2015 01:36:10 +0000 (UTC) Received: (qmail 98566 invoked by uid 500); 21 Aug 2015 01:36:10 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 98394 invoked by uid 500); 21 Aug 2015 01:36:09 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 97578 invoked by uid 99); 21 Aug 2015 01:36:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 01:36:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55B9EE7DF8; Fri, 21 Aug 2015 01:36:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 21 Aug 2015 01:36:30 -0000 Message-Id: <003d5fed46184a508816e48c450c6d52@git.apache.org> In-Reply-To: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> References: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/50] [abbrv] tez git commit: TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth) TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b997371 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b997371 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b997371 Branch: refs/heads/TEZ-2003 Commit: 3b9973710c46235620998d85f94e0d24bdf91126 Parents: 098aa8e Author: Siddharth Seth Authored: Thu May 28 02:01:04 2015 -0700 Committer: Siddharth Seth Committed: Thu Aug 20 18:22:07 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/common/TezUtilsInternal.java | 31 ++++++++----- .../apache/tez/dag/api/ContainerEndReason.java | 27 +++++++++++ .../tez/dag/api/TaskAttemptEndReason.java | 13 +++--- .../apache/tez/dag/api/TaskCommunicator.java | 11 +++-- .../apache/tez/dag/app/TaskAttemptListener.java | 6 ++- .../dag/app/TaskAttemptListenerImpTezDag.java | 9 ++-- .../tez/dag/app/TezTaskCommunicatorImpl.java | 6 ++- .../rm/container/AMContainerEventCompleted.java | 41 +++++++++++++++++ .../dag/app/rm/container/AMContainerImpl.java | 35 ++++++++------- .../app/TestTaskAttemptListenerImplTezDag.java | 8 ++-- .../dag/app/rm/container/TestAMContainer.java | 47 +++++++++++--------- .../TezTestServiceTaskCommunicatorImpl.java | 9 ++-- .../apache/tez/runtime/task/TezTaskRunner2.java | 20 +++++++-- 14 files changed, 186 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index d651960..e333832 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -28,5 +28,6 @@ ALL CHANGES: TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. TEZ-2465. Retrun the status of a kill request in TaskRunner2. TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info. + TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 347a4f6..0bdeb79 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -246,10 +246,16 @@ public class TezUtilsInternal { return TaskAttemptTerminationCause.COMMUNICATION_ERROR; case SERVICE_BUSY: return TaskAttemptTerminationCause.SERVICE_BUSY; - case INTERRUPTED_BY_SYSTEM: - return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM; - case INTERRUPTED_BY_USER: - return TaskAttemptTerminationCause.INTERRUPTED_BY_USER; + case INTERNAL_PREEMPTION: + return TaskAttemptTerminationCause.INTERNAL_PREEMPTION; + case EXTERNAL_PREEMPTION: + return TaskAttemptTerminationCause.EXTERNAL_PREEMPTION; + case APPLICATION_ERROR: + return TaskAttemptTerminationCause.APPLICATION_ERROR; + case FRAMEWORK_ERROR: + return TaskAttemptTerminationCause.FRAMEWORK_ERROR; + case NODE_FAILED: + return TaskAttemptTerminationCause.NODE_FAILED; case OTHER: return TaskAttemptTerminationCause.UNKNOWN_ERROR; default: @@ -267,20 +273,24 @@ public class TezUtilsInternal { return TaskAttemptEndReason.COMMUNICATION_ERROR; case SERVICE_BUSY: return TaskAttemptEndReason.SERVICE_BUSY; + case INTERNAL_PREEMPTION: + return TaskAttemptEndReason.INTERNAL_PREEMPTION; + case EXTERNAL_PREEMPTION: + return TaskAttemptEndReason.EXTERNAL_PREEMPTION; + case APPLICATION_ERROR: + return TaskAttemptEndReason.APPLICATION_ERROR; + case FRAMEWORK_ERROR: + return TaskAttemptEndReason.FRAMEWORK_ERROR; + case NODE_FAILED: + return TaskAttemptEndReason.NODE_FAILED; case INTERRUPTED_BY_SYSTEM: - return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM; case INTERRUPTED_BY_USER: - return TaskAttemptEndReason.INTERRUPTED_BY_USER; case UNKNOWN_ERROR: case TERMINATED_BY_CLIENT: case TERMINATED_AT_SHUTDOWN: - case INTERNAL_PREEMPTION: - case EXTERNAL_PREEMPTION: case TERMINATED_INEFFECTIVE_SPECULATION: case TERMINATED_EFFECTIVE_SPECULATION: case TERMINATED_ORPHANED: - case APPLICATION_ERROR: - case FRAMEWORK_ERROR: case INPUT_READ_ERROR: case OUTPUT_WRITE_ERROR: case OUTPUT_LOST: @@ -288,7 +298,6 @@ public class TezUtilsInternal { case CONTAINER_LAUNCH_FAILED: case CONTAINER_EXITED: case CONTAINER_STOPPED: - case NODE_FAILED: case NODE_DISK_ERROR: default: return TaskAttemptEndReason.OTHER; http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java new file mode 100644 index 0000000..e13e886 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +// TODO TEZ-2003 Expose as a public API +public enum ContainerEndReason { + NODE_FAILED, // Completed because the node running the container was marked as dead + INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision + EXTERNAL_PREEMPTION, // Preempted due to cluster contention + APPLICATION_ERROR, // An error in the AM caused by user code + FRAMEWORK_ERROR, // An error in the AM - likely a bug. + LAUNCH_FAILED, // Failure to launch the container + COMPLETED, // Completed via normal flow + OTHER +} http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java index 96a4768..de78d21 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java +++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java @@ -16,9 +16,12 @@ package org.apache.tez.dag.api; // TODO TEZ-2003 Expose as a public API public enum TaskAttemptEndReason { - COMMUNICATION_ERROR, - SERVICE_BUSY, - INTERRUPTED_BY_SYSTEM, - INTERRUPTED_BY_USER, - OTHER + NODE_FAILED, // Completed because the node running the container was marked as dead + COMMUNICATION_ERROR, // Communication error with the task + SERVICE_BUSY, // External service busy + INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision + EXTERNAL_PREEMPTION, // Preempted due to cluster contention + APPLICATION_ERROR, // An error in the AM caused by user code + FRAMEWORK_ERROR, // An error in the AM - likely a bug. + OTHER // Unknown reason } http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 2651013..d0a006b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -36,7 +36,10 @@ public abstract class TaskCommunicator extends AbstractService { public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. - public abstract void registerContainerEnd(ContainerId containerId); + public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason); + + // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end. + // Was it caused by preemption - or as a result of a general task completion / container completion // TODO TEZ-2003 TaskSpec breakup into a clean interface // TODO TEZ-2003 Add support for priority @@ -48,11 +51,7 @@ public abstract class TaskCommunicator extends AbstractService { // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness. // TODO TEZ-2003 Remove reference to TaskAttemptID - // TODO TEZ-2003 This needs some information about why the attempt is being unregistered. - // e.g. preempted in which case the task may need to be informed. Alternately as a result of - // a failed task. - // In case of preemption - a killTask API is likely a better bet than trying to overload this method. - public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID); + public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason); // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM. public abstract InetSocketAddress getAddress(); http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java index e4dad27..92e38ae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java @@ -21,6 +21,8 @@ package org.apache.tez.dag.app; import java.net.InetSocketAddress; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.rm.container.AMContainerTask; @@ -34,9 +36,9 @@ public interface TaskAttemptListener { void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); - void unregisterRunningContainer(ContainerId containerId, int taskCommId); + void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason); - void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId); + void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason); void dagComplete(DAG dag); http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index db78fa9..1c61a0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.ListUtils; +import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -355,7 +356,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void unregisterRunningContainer(ContainerId containerId, int taskCommId) { + public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) { if (LOG.isDebugEnabled()) { LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); } @@ -363,7 +364,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); } - taskCommunicators[taskCommId].registerContainerEnd(containerId); + taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason); } @Override @@ -404,7 +405,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) { + public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) { ContainerId containerId = registeredAttempts.remove(attemptId); if (containerId == null) { LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); @@ -418,7 +419,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId); + taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index accde2c..3774eb4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -41,6 +41,8 @@ import org.apache.tez.common.ContainerContext; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TaskHeartbeatRequest; @@ -185,7 +187,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } @Override - public void registerContainerEnd(ContainerId containerId) { + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { ContainerInfo containerInfo = registeredContainers.remove(containerId); if (containerInfo != null) { synchronized(containerInfo) { @@ -231,7 +233,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) { TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID); ContainerId containerId = attemptToContainerMap.remove(taskAttempt); if(containerId == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java index 9bb6d7f..8ef2a83 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm.container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.records.TaskAttemptTerminationCause; public class AMContainerEventCompleted extends AMContainerEvent { @@ -61,4 +62,44 @@ public class AMContainerEventCompleted extends AMContainerEvent { return errCause; } + public ContainerEndReason getContainerEndReason() { + if (errCause != null) { + switch (errCause) { + case INTERNAL_PREEMPTION: + return ContainerEndReason.INTERNAL_PREEMPTION; + case EXTERNAL_PREEMPTION: + return ContainerEndReason.EXTERNAL_PREEMPTION; + case FRAMEWORK_ERROR: + return ContainerEndReason.FRAMEWORK_ERROR; + case APPLICATION_ERROR: + return ContainerEndReason.APPLICATION_ERROR; + case CONTAINER_LAUNCH_FAILED: + return ContainerEndReason.LAUNCH_FAILED; + case NODE_FAILED: + return ContainerEndReason.NODE_FAILED; + case CONTAINER_EXITED: + return ContainerEndReason.COMPLETED; + case UNKNOWN_ERROR: + case TERMINATED_BY_CLIENT: + case TERMINATED_AT_SHUTDOWN: + case TERMINATED_INEFFECTIVE_SPECULATION: + case TERMINATED_EFFECTIVE_SPECULATION: + case TERMINATED_ORPHANED: + case INPUT_READ_ERROR: + case OUTPUT_WRITE_ERROR: + case OUTPUT_LOST: + case TASK_HEARTBEAT_ERROR: + case CONTAINER_STOPPED: + case NODE_DISK_ERROR: + case COMMUNICATION_ERROR: + case SERVICE_BUSY: + case INTERRUPTED_BY_SYSTEM: + case INTERRUPTED_BY_USER: + default: + return ContainerEndReason.OTHER; + } + } else { + return ContainerEndReason.OTHER; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 8685556..7446734 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -27,6 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.Credentials; @@ -635,7 +638,7 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatingToTaskAttempt(container.currentAttempt, event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED); } - container.unregisterFromTAListener(); + container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED); container.deAllocate(); } } @@ -665,7 +668,7 @@ public class AMContainerImpl implements AMContainer { } container.containerLocalResources = null; container.additionalLocalResources = null; - container.unregisterFromTAListener(); + container.unregisterFromTAListener(event.getContainerEndReason()); String diag = event.getDiagnostics(); if (!(diag == null || diag.equals(""))) { LOG.info("Container " + container.getContainerId() @@ -691,7 +694,7 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatingToTaskAttempt(container.currentAttempt, getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED); } - container.unregisterFromTAListener(); + container.unregisterFromTAListener(ContainerEndReason.OTHER); container.logStopped(container.currentAttempt == null ? ContainerExitStatus.SUCCESS : ContainerExitStatus.INVALID); @@ -743,7 +746,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterFromTAListener(); + container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED); container.deAllocate(); } } @@ -760,7 +763,7 @@ public class AMContainerImpl implements AMContainer { container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); } container.logStopped(ContainerExitStatus.ABORTED); - container.unregisterFromTAListener(); + container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR); container.sendStopRequestToNM(); } } @@ -832,7 +835,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR); container.handleExtraTAAssign(event, container.currentAttempt); } } @@ -843,7 +846,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { container.lastTaskFinishTime = System.currentTimeMillis(); container.completedAttempts.add(container.currentAttempt); - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER); container.currentAttempt = null; } } @@ -860,7 +863,7 @@ public class AMContainerImpl implements AMContainer { container.sendTerminatedToTaskAttempt(container.currentAttempt, getMessage(container, event), event.getTerminationCause()); } - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause())); container.registerFailedAttempt(container.currentAttempt); container.currentAttempt= null; super.transition(container, cEvent); @@ -870,7 +873,7 @@ public class AMContainerImpl implements AMContainer { protected static class StopRequestAtRunningTransition extends StopRequestAtIdleTransition { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER); super.transition(container, cEvent); } } @@ -891,7 +894,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED); } } @@ -900,7 +903,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); - container.unregisterAttemptFromListener(container.currentAttempt); + container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR); container.sendTerminatingToTaskAttempt(container.currentAttempt, "Container " + container.getContainerId() + " hit an invalid transition - " + cEvent.getType() + " at " + @@ -1026,7 +1029,7 @@ public class AMContainerImpl implements AMContainer { LOG.warn(errorMessage); this.logStopped(ContainerExitStatus.INVALID); this.sendStopRequestToNM(); - this.unregisterFromTAListener(); + this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR); this.unregisterFromContainerListener(); } @@ -1084,8 +1087,8 @@ public class AMContainerImpl implements AMContainer { container.getNodeId(), container.getContainerToken(), launcherId)); } - protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) { - taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId); + protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) { + taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason); } protected void registerAttemptWithListener(AMContainerTask amContainerTask) { @@ -1096,8 +1099,8 @@ public class AMContainerImpl implements AMContainer { taskAttemptListener.registerRunningContainer(containerId, taskCommId); } - protected void unregisterFromTAListener() { - this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId); + protected void unregisterFromTAListener(ContainerEndReason endReason) { + this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason); } protected void registerWithContainerListener() { http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 34b9792..68d3baf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -47,6 +47,8 @@ import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; @@ -163,12 +165,12 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats - taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0); + taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER); containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); // Container unregistered. Should send a shouldDie = true - taskAttemptListener.unregisterRunningContainer(containerId2, 0); + taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER); containerTask = tezUmbilical.getTask(containerContext2); assertTrue(containerTask.shouldDie()); @@ -182,7 +184,7 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0); taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0); - taskAttemptListener.unregisterRunningContainer(containerId3, 0); + taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER); containerTask = tezUmbilical.getTask(containerContext3); assertTrue(containerTask.shouldDie()); } http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 44dcd1f..322eabc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.ContainerEndReason; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; @@ -135,14 +137,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -184,13 +186,13 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -235,7 +237,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taId2); @@ -250,14 +252,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(taId2, 0); + verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); verify(wc.chh).unregister(wc.containerID); assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); @@ -290,7 +292,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -327,7 +329,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -350,7 +352,7 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -388,7 +390,7 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -424,7 +426,7 @@ public class TestAMContainer { wc.containerTimedOut(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -458,7 +460,7 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -492,7 +494,7 @@ public class TestAMContainer { wc.launchFailed(); wc.verifyState(AMContainerState.STOPPING); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -542,7 +544,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -572,7 +574,7 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -603,7 +605,7 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -634,7 +636,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -663,7 +665,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -698,7 +700,7 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -735,7 +737,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, + ContainerEndReason.INTERNAL_PREEMPTION); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -772,7 +775,7 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index cf28b11..98673a6 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; @@ -98,8 +99,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } @Override - public void registerContainerEnd(ContainerId containerId) { - super.registerContainerEnd(containerId); + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { + super.registerContainerEnd(containerId, endReason); } @Override @@ -175,8 +176,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { - super.unregisterRunningTaskAttempt(taskAttemptID); + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) { + super.unregisterRunningTaskAttempt(taskAttemptID, endReason); // Nothing else to do for now. The push API in the test does not support termination of a running task } http://git-wip-us.apache.org/repos/asf/tez/blob/3b997371/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 3bf9f84..15629fd 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -261,7 +261,13 @@ public class TezTaskRunner2 { taskRunnerCallable.interruptTask(); } return true; + } else { + LOG.info("Ignoring killTask request for {} since end reason is already set to {}", + task.getTaskAttemptID(), firstEndReason); } + } else { + LOG.info("Ignoring killTask request for {} since it is not in a running state", + task.getTaskAttemptID()); } } return false; @@ -389,10 +395,18 @@ public class TezTaskRunner2 { isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED); // Respect stopContainerRequested since it can come in at any point, despite a previous failure. stopContainerRequested.set(true); - } - if (isFirstTerminate) { - killTask(); + if (isFirstTerminate) { + LOG.info("Attempting to abort {} since a shutdown request was received", + task.getTaskAttemptID()); + if (taskRunnerCallable != null) { + taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.interruptTask(); + } + } else { + LOG.info("Not acting on shutdown request for {} since the task is not in running state", + task.getTaskAttemptID()); + } } } }