Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8DBD17389 for ; Tue, 7 Apr 2015 20:12:20 +0000 (UTC) Received: (qmail 88041 invoked by uid 500); 7 Apr 2015 20:12:20 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 87961 invoked by uid 500); 7 Apr 2015 20:12:20 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 86664 invoked by uid 99); 7 Apr 2015 20:12:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 20:12:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB239E2F1C; Tue, 7 Apr 2015 20:12:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Tue, 07 Apr 2015 20:12:53 -0000 Message-Id: <2faee77f66e843c8898a499a25b6fdbf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/35] tez git commit: TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. (sseth) TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7db5a8d0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7db5a8d0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7db5a8d0 Branch: refs/heads/TEZ-2003 Commit: 7db5a8d0c6a6748ac1dd0104f5217a55f68b0875 Parents: f0a7567 Author: Siddharth Seth Authored: Tue Mar 10 01:25:39 2015 -0700 Committer: Siddharth Seth Committed: Tue Apr 7 13:11:45 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/common/TezUtilsInternal.java | 60 +++++++++ .../tez/dag/api/TaskAttemptEndReason.java | 24 ++++ .../records/TaskAttemptTerminationCause.java | 7 +- .../apache/tez/dag/api/TaskCommunicator.java | 2 + .../tez/dag/api/TaskCommunicatorContext.java | 13 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 33 +++++ .../event/TaskAttemptEventAttemptFailed.java | 2 + .../event/TaskAttemptEventAttemptKilled.java | 47 +++++++ .../dag/app/dag/event/TaskAttemptEventType.java | 5 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 33 ++++- .../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 9 +- .../dag/app/rm/LocalTaskSchedulerService.java | 3 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 7 +- .../tez/dag/app/rm/TaskSchedulerService.java | 6 +- .../dag/app/rm/YarnTaskSchedulerService.java | 8 +- .../app/TestTaskAttemptListenerImplTezDag.java | 1 + .../app/TestTaskAttemptListenerImplTezDag2.java | 126 +++++++++++++++++++ .../tez/dag/app/rm/TestContainerReuse.java | 65 +++++----- .../app/rm/TestLocalTaskSchedulerService.java | 5 +- .../tez/dag/app/rm/TestTaskScheduler.java | 18 +-- .../rm/TezTestServiceTaskSchedulerService.java | 3 +- .../TezTestServiceTaskCommunicatorImpl.java | 36 +++++- .../org/apache/tez/service/ContainerRunner.java | 5 +- .../tez/service/MiniTezTestServiceCluster.java | 5 +- .../tez/service/impl/ContainerRunnerImpl.java | 60 +++++++-- .../apache/tez/service/impl/TezTestService.java | 6 +- .../impl/TezTestServiceProtocolServerImpl.java | 10 +- .../tez/tests/TestExternalTezServices.java | 29 +++++ 29 files changed, 548 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 7726815..774a685 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -10,5 +10,6 @@ ALL CHANGES: TEZ-2138. Fix minor bugs in adding default scheduler, getting launchers. TEZ-2139. Update tez version to 0.7.0-TEZ-2003-SNAPSHOT. TEZ-2175. Task priority should be available to the TaskCommunicator plugin. + TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 9c78377..347a4f6 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.log4j.Appender; import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Stopwatch; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; @Private public class TezUtilsInternal { @@ -234,4 +236,62 @@ public class TezUtilsInternal { return sb.toString(); } + public static TaskAttemptTerminationCause fromTaskAttemptEndReason( + TaskAttemptEndReason taskAttemptEndReason) { + if (taskAttemptEndReason == null) { + return null; + } + switch (taskAttemptEndReason) { + case COMMUNICATION_ERROR: + return TaskAttemptTerminationCause.COMMUNICATION_ERROR; + case SERVICE_BUSY: + return TaskAttemptTerminationCause.SERVICE_BUSY; + case INTERRUPTED_BY_SYSTEM: + return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM; + case INTERRUPTED_BY_USER: + return TaskAttemptTerminationCause.INTERRUPTED_BY_USER; + case OTHER: + return TaskAttemptTerminationCause.UNKNOWN_ERROR; + default: + return TaskAttemptTerminationCause.UNKNOWN_ERROR; + } + } + + public static TaskAttemptEndReason toTaskAttemptEndReason(TaskAttemptTerminationCause cause) { + // TODO Post TEZ-2003. Consolidate these states, and mappings. + if (cause == null) { + return null; + } + switch (cause) { + case COMMUNICATION_ERROR: + return TaskAttemptEndReason.COMMUNICATION_ERROR; + case SERVICE_BUSY: + return TaskAttemptEndReason.SERVICE_BUSY; + case INTERRUPTED_BY_SYSTEM: + return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM; + case INTERRUPTED_BY_USER: + return TaskAttemptEndReason.INTERRUPTED_BY_USER; + case UNKNOWN_ERROR: + case TERMINATED_BY_CLIENT: + case TERMINATED_AT_SHUTDOWN: + case INTERNAL_PREEMPTION: + case EXTERNAL_PREEMPTION: + case TERMINATED_INEFFECTIVE_SPECULATION: + case TERMINATED_EFFECTIVE_SPECULATION: + case TERMINATED_ORPHANED: + case APPLICATION_ERROR: + case FRAMEWORK_ERROR: + case INPUT_READ_ERROR: + case OUTPUT_WRITE_ERROR: + case OUTPUT_LOST: + case TASK_HEARTBEAT_ERROR: + case CONTAINER_LAUNCH_FAILED: + case CONTAINER_EXITED: + case CONTAINER_STOPPED: + case NODE_FAILED: + case NODE_DISK_ERROR: + default: + return TaskAttemptEndReason.OTHER; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java new file mode 100644 index 0000000..96a4768 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +// TODO TEZ-2003 Expose as a public API +public enum TaskAttemptEndReason { + COMMUNICATION_ERROR, + SERVICE_BUSY, + INTERRUPTED_BY_SYSTEM, + INTERRUPTED_BY_USER, + OTHER +} http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java index ef0bb33..7112d9e 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java @@ -41,5 +41,10 @@ public enum TaskAttemptTerminationCause { CONTAINER_STOPPED, // Container stopped or released by Tez NODE_FAILED, // Node for the container failed NODE_DISK_ERROR, // Disk failed on the node runnign the task - + + COMMUNICATION_ERROR, // Equivalent to a launch failure + SERVICE_BUSY, // Service rejected the task + INTERRUPTED_BY_SYSTEM, // Interrupted by the system. e.g. Pre-emption + INTERRUPTED_BY_USER, // Interrupted by the user + } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 82eed20..945091e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -45,6 +45,8 @@ public abstract class TaskCommunicator extends AbstractService { Credentials credentials, boolean credentialsChanged, int priority); + // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness. + // TODO TEZ-2003 Remove reference to TaskAttemptID public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 41675fe..a85fb7f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.api; +import javax.annotation.Nullable; import java.io.IOException; import org.apache.hadoop.security.Credentials; @@ -37,15 +38,21 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 Move to vertex, taskIndex, version boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; boolean isKnownContainer(ContainerId containerId); - // TODO TEZ-2003 Move to vertex, taskIndex, version + // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId); - // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure. - // This will have to take into consideration the TA_FAILED event + // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* + void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); + + // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* + void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics); + + // TODO TEZ-2003 API. Should a method exist for task succeeded. // TODO Eventually Add methods to report availability stats to the scheduler. } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 7973775..d2c3304 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -26,14 +26,17 @@ import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; @@ -43,7 +46,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -234,6 +240,33 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements pingContainerHeartbeatHandler(containerId); } + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + String diagnostics) { + // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, + // and messages from the scheduler will release the container. + // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore, + // instead of waiting for the unregister to flow through the Container. + // Fix along the same lines as TEZ-2124 by introducing an explict context. + context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId, + diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + taskAttemptEndReason))); + } + + @Override + public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, + String diagnostics) { + // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler, + // and messages from the scheduler will release the container. + // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore, + // instead of waiting for the unregister to flow through the Container. + // Fix along the same lines as TEZ-2124 by introducing an explict context. + context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId, + TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason( + taskAttemptEndReason))); + } + + /** * Child checking whether it can commit. *

http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java index b9c1d09..7ec8921 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java @@ -26,6 +26,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent private final String diagnostics; private final TaskAttemptTerminationCause errorCause; + + /* Accepted Types - FAILED, TIMED_OUT */ public TaskAttemptEventAttemptFailed(TezTaskAttemptID id, TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) { super(id, type); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java new file mode 100644 index 0000000..72e6b07 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezTaskAttemptID; + +public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent + implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent { + + private final String diagnostics; + private final TaskAttemptTerminationCause errorCause; + public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, + String diagnostics, + TaskAttemptTerminationCause errorCause) { + super(id, TaskAttemptEventType.TA_KILLED); + this.diagnostics = diagnostics; + this.errorCause = errorCause; + } + + @Override + public String getDiagnosticInfo() { + return diagnostics; + } + + @Override + public TaskAttemptTerminationCause getTerminationCause() { + return errorCause; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java index b7aca36..6d20368 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java @@ -26,14 +26,15 @@ public enum TaskAttemptEventType { //Producer:Task, Speculator TA_SCHEDULE, -//Producer: TaskAttemptListener +//Producer: TaskAttemptListener | Vertex after routing events TA_STARTED_REMOTELY, TA_STATUS_UPDATE, TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED TA_DONE, TA_FAILED, + TA_KILLED, // Generated by TaskCommunicators TA_TIMED_OUT, - + //Producer: Client, Scheduler, On speculation. TA_KILL_REQUEST, http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e1851c4..6c2deb7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.Records; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -179,6 +180,11 @@ public class TaskAttemptImpl implements TaskAttempt, private final StateMachine stateMachine; + // TODO TEZ-2003 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before + // TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating + // with the listening service and getting a response, which in turn can trigger STATUS_UPDATES / FAILED / KILLED + + // TA_KILLED handled the same as TA_KILL_REQUEST. Just a different name indicating a request / already killed. private static StateMachineFactory stateMachineFactory @@ -219,6 +225,10 @@ public class TaskAttemptImpl implements TaskAttempt, new TerminatedBeforeRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptEventType.TA_KILLED, + new TerminatedBeforeRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.START_WAIT, + TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition()) .addTransition(TaskAttemptStateInternal.START_WAIT, @@ -259,6 +269,10 @@ public class TaskAttemptImpl implements TaskAttempt, new TerminatedWhileRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, + TaskAttemptEventType.TA_KILLED, + new TerminatedWhileRunningTransition(KILLED_HELPER)) + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, @@ -297,6 +311,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_KILL_REQUEST, + TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED)) @@ -318,6 +333,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_KILL_REQUEST, + TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED)) @@ -336,6 +352,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_KILL_REQUEST, + TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, @@ -355,6 +372,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_KILL_REQUEST, + TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, @@ -377,6 +395,12 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), + TaskAttemptEventType.TA_KILLED, + new TerminatedAfterSuccessTransition()) + .addTransition( + TaskAttemptStateInternal.SUCCEEDED, + EnumSet.of(TaskAttemptStateInternal.KILLED, + TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition()) .addTransition( @@ -424,7 +448,6 @@ public class TaskAttemptImpl implements TaskAttempt, this.leafVertex = leafVertex; } - @Override public TezTaskAttemptID getID() { return attemptId; @@ -1019,6 +1042,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Compute node/rack location request even if re-scheduled. Set racks = new HashSet(); + // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts. TaskLocationHint locationHint = ta.getTaskLocationHint(); if (locationHint != null) { if (locationHint.getRacks() != null) { @@ -1093,6 +1117,8 @@ public class TaskAttemptImpl implements TaskAttempt, @Override public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { + // This transition should not be invoked directly, if a scheduler event has already been sent out. + // Sub-classes should be used if a scheduler request has been sent. ta.setFinishTime(); if (event instanceof DiagnosableEvent) { @@ -1207,7 +1233,8 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the scheduler if (sendSchedulerEvent()) { ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper - .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier())); + .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), + ta.getVertex().getTaskSchedulerIdentifier())); } } } @@ -1288,7 +1315,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the Scheduler. ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, - TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier())); + TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java index 2ace642..a775948 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.rm; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -27,14 +28,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { private final TaskAttempt attempt; private final ContainerId containerId; private final TaskAttemptState state; + private final TaskAttemptEndReason taskAttemptEndReason; private final int schedulerId; public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId, - TaskAttemptState state, int schedulerId) { + TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) { super(AMSchedulerEventType.S_TA_ENDED); this.attempt = attempt; this.containerId = containerId; this.state = state; + this.taskAttemptEndReason = taskAttemptEndReason; this.schedulerId = schedulerId; } @@ -57,4 +60,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { public int getSchedulerId() { return schedulerId; } + + public TaskAttemptEndReason getTaskAttemptEndReason() { + return taskAttemptEndReason; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index ecb7ad7..fa5f47c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; @@ -149,7 +150,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { } @Override - public boolean deallocateTask(Object task, boolean taskSucceeded) { + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) { return taskRequestHandler.addDeallocateTaskRequest(task); } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 193df49..c0a2e31 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -291,7 +291,9 @@ public class TaskSchedulerEventHandler extends AbstractService private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) { TaskAttempt attempt = event.getAttempt(); - boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false); + // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation. + boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()] + .deallocateTask(attempt, false, event.getTaskAttemptEndReason()); // use stored value of container id in case the scheduler has removed this // assignment because the task has been deallocated earlier. // retroactive case @@ -313,6 +315,7 @@ public class TaskSchedulerEventHandler extends AbstractService sendEvent(new AMContainerEventStopRequest(attemptContainerId)); // Inform the Node - the task has asked to be STOPPED / has already // stopped. + // AMNodeImpl blacklisting logic does not account for KILLED attempts. sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers(). get(attemptContainerId).getContainer().getNodeId(), attemptContainerId, attempt.getID(), event.getState() == TaskAttemptState.FAILED)); @@ -334,7 +337,7 @@ public class TaskSchedulerEventHandler extends AbstractService } boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, - true); + true, null); if (!wasContainerAllocated) { LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java index 096069b..070f706 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.TaskAttemptEndReason; public abstract class TaskSchedulerService extends AbstractService{ @@ -61,8 +62,9 @@ public abstract class TaskSchedulerService extends AbstractService{ public abstract void allocateTask(Object task, Resource capability, ContainerId containerId, Priority priority, Object containerSignature, Object clientCookie); - - public abstract boolean deallocateTask(Object task, boolean taskSucceeded); + + /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */ + public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason); public abstract Object deallocateContainer(ContainerId containerId); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 66a6f33..788f77e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; @@ -987,10 +988,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService * the task to de-allocate. * @param taskSucceeded * specify whether the task succeeded or failed. + * @param endReason + * reason for the task ending * @return true if a container is assigned to this task. */ @Override - public boolean deallocateTask(Object task, boolean taskSucceeded) { + public boolean deallocateTask(Object task, boolean taskSucceeded, + TaskAttemptEndReason endReason) { Map assignedContainers = null; synchronized (this) { @@ -1180,7 +1184,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService CookieContainerRequest request = entry.getValue(); if (request.getPriority().equals(lowestPriNewContainer.getPriority())) { LOG.info("Resending request for task again: " + task); - deallocateTask(task, true); + deallocateTask(task, true, null); allocateTask(task, request.getCapability(), (request.getNodes() == null ? null : request.getNodes().toArray(new String[request.getNodes().size()])), http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 361565a..f63b636 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -46,6 +46,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; import org.junit.Test; +// TODO TEZ-2003 Rename to TestTezTaskCommunicator public class TestTaskAttemptListenerImplTezDag { @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java new file mode 100644 index 0000000..934543f --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.api.TaskAttemptEndReason; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled; +import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.dag.app.rm.container.AMContainerTask; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to. +public class TestTaskAttemptListenerImplTezDag2 { + + @Test(timeout = 5000) + public void testTaskAttemptFailedKilled() { + ApplicationId appId = ApplicationId.newInstance(1000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + Credentials credentials = new Credentials(); + AppContext appContext = mock(AppContext.class); + EventHandler eventHandler = mock(EventHandler.class); + DAG dag = mock(DAG.class); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + Map appAcls = new HashMap(); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(credentials).when(appContext).getAppCredentials(); + doReturn(appAcls).when(appContext).getApplicationACLs(); + doReturn(amContainerMap).when(appContext).getAllContainers(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); + + TaskAttemptListenerImpTezDag taskAttemptListener = + new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), + mock(ContainerHeartbeatHandler.class), null, null, false); + + TaskSpec taskSpec1 = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class); + doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID(); + AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10); + + TaskSpec taskSpec2 = mock(TaskSpec.class); + TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class); + doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID(); + AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10); + + ContainerId containerId1 = createContainerId(appId, 1); + taskAttemptListener.registerRunningContainer(containerId1, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0); + ContainerId containerId2 = createContainerId(appId, 2); + taskAttemptListener.registerRunningContainer(containerId2, 0); + taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0); + + + taskAttemptListener + .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1"); + taskAttemptListener + .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2"); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(argumentCaptor.capture()); + assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed); + assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled); + TaskAttemptEventAttemptFailed failedEvent = + (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0); + TaskAttemptEventAttemptKilled killedEvent = + (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1); + + assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo()); + assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR, + failedEvent.getTerminationCause()); + + assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo()); + assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause()); + // TODO TEZ-2003. Verify unregistration from the registered list + } + + private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx); + return containerId; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index c1169ef..d45346a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -57,6 +58,7 @@ import org.apache.tez.common.MockDNSToSwitchMapping; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; @@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.internal.matchers.Null; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -223,9 +226,9 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded( - ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0)); + ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(1)).taskAllocated( eq(ta31), any(Object.class), eq(containerHost1)); verify(rmClient, times(0)).releaseAssignedContainer( @@ -235,7 +238,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); long currentTs = System.currentTimeMillis(); Throwable exception = null; @@ -356,9 +359,9 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta21), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( eq(ta31), any(Object.class), eq(containerHost2)); verify(rmClient, times(1)).releaseAssignedContainer( @@ -459,9 +462,9 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1)); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -469,19 +472,19 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta12), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); // Verify no re-use if a previous task fails. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0)); drainableAppCallback.drain(); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1)); - verify(taskScheduler).deallocateTask(eq(ta13), eq(false)); + verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null)); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -496,9 +499,9 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2)); // Task assigned to container completed successfully. No pending requests. Container should be released. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta14), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null)); verify(rmClient).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -607,9 +610,9 @@ public class TestContainerReuse { // First task had profiling on. This container can not be reused further. taskSchedulerEventHandler.handleEvent( - new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); @@ -653,9 +656,9 @@ public class TestContainerReuse { // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta13), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container2)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); @@ -698,9 +701,9 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3)); //Ensure task 6 (of vertex 1) is allocated to same container - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta15), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3)); eventHandler.reset(); @@ -811,9 +814,9 @@ public class TestContainerReuse { // until delay expires. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -828,7 +831,7 @@ public class TestContainerReuse { // TA12 completed. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(3000l); @@ -946,9 +949,9 @@ public class TestContainerReuse { // Container should be assigned to task21. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated( eq(ta21), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -956,7 +959,7 @@ public class TestContainerReuse { // Task 2 completes. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, container1.getId(), - TaskAttemptState.SUCCEEDED, 0)); + TaskAttemptState.SUCCEEDED, null, 0)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); LOG.info("Sleeping to ensure that the scheduling loop runs"); @@ -1065,9 +1068,9 @@ public class TestContainerReuse { assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta111), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1077,9 +1080,9 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta112), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1118,9 +1121,9 @@ public class TestContainerReuse { assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); eventHandler.reset(); - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta211), eq(true)); + verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index 25cf4b5..0a642bb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; @@ -94,7 +95,7 @@ public class TestLocalTaskSchedulerService { Task task = mock(Task.class); taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null); - taskSchedulerService.deallocateTask(task, false); + taskSchedulerService.deallocateTask(task, false, null); // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first taskSchedulerService.startRequestHandlerThread(); @@ -126,7 +127,7 @@ public class TestLocalTaskSchedulerService { MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler(); requestHandler.drainRequest(1); - taskSchedulerService.deallocateTask(task, false); + taskSchedulerService.deallocateTask(task, false, null); requestHandler.drainRequest(2); assertEquals(1, requestHandler.deallocateCount); assertEquals(1, requestHandler.allocateCount); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index dabae67..807e772 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -172,7 +172,7 @@ public class TestTaskScheduler { addContainerRequest((CookieContainerRequest) any()); // returned from task requests before allocation happens - assertFalse(scheduler.deallocateTask(mockTask1, true)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null)); verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class)); verify(mockRMClient, times(1)). removeContainerRequest((CookieContainerRequest) any()); @@ -180,7 +180,7 @@ public class TestTaskScheduler { releaseAssignedContainer((ContainerId) any()); // deallocating unknown task - assertFalse(scheduler.deallocateTask(mockTask1, true)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null)); verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class)); verify(mockRMClient, times(1)). removeContainerRequest((CookieContainerRequest) any()); @@ -325,7 +325,7 @@ public class TestTaskScheduler { verify(mockRMClient).releaseAssignedContainer(mockCId4); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask1, true)); + assertTrue(scheduler.deallocateTask(mockTask1, true, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId1); verify(mockRMClient).releaseAssignedContainer(mockCId1); @@ -445,7 +445,7 @@ public class TestTaskScheduler { verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any()); verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask4, true)); + assertTrue(scheduler.deallocateTask(mockTask4, true, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId6); verify(mockRMClient).releaseAssignedContainer(mockCId6); @@ -475,7 +475,7 @@ public class TestTaskScheduler { removeContainerRequest((CookieContainerRequest) any()); verify(mockRMClient, times(8)).addContainerRequest( (CookieContainerRequest) any()); - assertFalse(scheduler.deallocateTask(mockTask1, true)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null)); List mockUpdatedNodes = mock(List.class); scheduler.onNodesUpdated(mockUpdatedNodes); @@ -741,7 +741,7 @@ public class TestTaskScheduler { verify(mockRMClient).releaseAssignedContainer(mockCId4); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask1, true)); + assertTrue(scheduler.deallocateTask(mockTask1, true, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId1); verify(mockRMClient).releaseAssignedContainer(mockCId1); @@ -871,7 +871,7 @@ public class TestTaskScheduler { verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any()); verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask4, true)); + assertTrue(scheduler.deallocateTask(mockTask4, true, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId6); verify(mockRMClient).releaseAssignedContainer(mockCId6); @@ -960,8 +960,8 @@ public class TestTaskScheduler { // container7 allocated to the task with affinity for it verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask5, true)); - assertTrue(scheduler.deallocateTask(mockTask6, true)); + assertTrue(scheduler.deallocateTask(mockTask5, true, null)); + assertTrue(scheduler.deallocateTask(mockTask6, true, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId7); verify(mockApp).containerBeingReleased(mockCId8); http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 5657f86..872d592 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import org.apache.tez.service.TezTestServiceConfConstants; @@ -198,7 +199,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } @Override - public boolean deallocateTask(Object task, boolean taskSucceeded) { + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) { ContainerId containerId = runningTasks.remove(task); if (containerId == null) { LOG.error("Could not determine ContainerId for task: " + task + http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index a327caf..e3385a2 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -19,16 +19,20 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.app.TezTestServiceCommunicator; @@ -83,6 +87,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl @Override public void serviceStop() { super.serviceStop(); + this.communicator.stop(); } @@ -123,13 +128,15 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID()); } + // Have to register this up front right now. Otherwise, it's possible for the task to start + // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. + getTaskCommunicatorContext() + .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); communicator.submitWork(requestProto, host, port, new TezTestServiceCommunicator.ExecuteRequestCallback() { @Override public void setResponse(SubmitWorkResponseProto response) { LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID()); - getTaskCommunicatorContext() - .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); } @Override @@ -137,6 +144,31 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in. LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); + if (t instanceof ServiceException) { + ServiceException se = (ServiceException) t; + t = se.getCause(); + } + if (t instanceof RemoteException) { + RemoteException re = (RemoteException)t; + String message = re.toString(); + if (message.contains(RejectedExecutionException.class.getName())) { + getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); + } else { + getTaskCommunicatorContext() + .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + t.toString()); + } + } else { + if (t instanceof IOException) { + getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); + } else { + getTaskCommunicatorContext() + .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + t.getMessage()); + } + } } }); } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java index 2bca4ed..28c2286 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java @@ -16,12 +16,13 @@ package org.apache.tez.service; import java.io.IOException; +import org.apache.tez.dag.api.TezException; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; public interface ContainerRunner { - void queueContainer(RunContainerRequestProto request) throws IOException; - void submitWork(SubmitWorkRequestProto request) throws IOException; + void queueContainer(RunContainerRequestProto request) throws TezException; + void submitWork(SubmitWorkRequestProto request) throws TezException; } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java index f47bd67..0ac0b33 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java @@ -133,7 +133,10 @@ public class MiniTezTestServiceCluster extends AbstractService { @Override public void serviceStop() { - tezTestService.stop(); + if (tezTestService != null) { + tezTestService.stop(); + tezTestService = null; + } } /** http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index 25d6030..379d952 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -58,6 +59,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.task.TaskReporter; import org.apache.tez.runtime.task.TezTaskRunner; import org.apache.tez.service.ContainerRunner; @@ -68,14 +70,18 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import org.apache.tez.shufflehandler.ShuffleHandler; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto; import org.apache.tez.util.ProtoConverters; public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + public static final String DAG_NAME_INSTRUMENTED_FAILURES = "InstrumentedFailures"; + private final ListeningExecutorService executorService; private final AtomicReference localAddress; private final String[] localDirsBase; @@ -146,10 +152,10 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun * Submit a container which is ready for running. * The regular pull mechanism will be used to fetch work from the AM * @param request - * @throws IOException + * @throws TezException */ @Override - public void queueContainer(RunContainerRequestProto request) throws IOException { + public void queueContainer(RunContainerRequestProto request) throws TezException { LOG.info("Queuing container for execution: " + request); Map env = new HashMap(); @@ -162,7 +168,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun for (int i = 0; i < localDirsBase.length; i++) { localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(), request.getUser()); - localFs.mkdirs(new Path(localDirs[i])); + try { + localFs.mkdirs(new Path(localDirs[i])); + } catch (IOException e) { + throw new TezException(e); + } } LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs)); @@ -175,7 +185,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun DataInputBuffer dib = new DataInputBuffer(); byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); dib.reset(tokenBytes, tokenBytes.length); - credentials.readTokenStorageStream(dib); + try { + credentials.readTokenStorageStream(dib); + } catch (IOException e) { + throw new TezException(e); + } Token jobToken = TokenCache.getSessionToken(credentials); @@ -197,13 +211,14 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun * This is intended for a task push from the AM * * @param request - * @throws IOException + * @throws org.apache.tez.dag.api.TezException */ @Override - public void submitWork(SubmitWorkRequestProto request) throws - IOException { + public void submitWork(SubmitWorkRequestProto request) throws TezException { LOG.info("Queuing work for execution: " + request); + checkAndThrowExceptionForTests(request); + Map env = new HashMap(); env.putAll(localEnv); env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); @@ -214,7 +229,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun for (int i = 0; i < localDirsBase.length; i++) { localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(), request.getUser()); - localFs.mkdirs(new Path(localDirs[i])); + try { + localFs.mkdirs(new Path(localDirs[i])); + } catch (IOException e) { + throw new TezException(e); + } } if (LOG.isDebugEnabled()) { LOG.debug("Dirs are: " + Arrays.toString(localDirs)); @@ -228,7 +247,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun DataInputBuffer dib = new DataInputBuffer(); byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); dib.reset(tokenBytes, tokenBytes.length); - credentials.readTokenStorageStream(dib); + try { + credentials.readTokenStorageStream(dib); + } catch (IOException e) { + throw new TezException(e); + } Token jobToken = TokenCache.getSessionToken(credentials); @@ -509,4 +532,23 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun } } + + private void checkAndThrowExceptionForTests(SubmitWorkRequestProto request) throws TezException { + if (!request.getTaskSpec().getDagName().equals(DAG_NAME_INSTRUMENTED_FAILURES)) { + return; + } + + TaskSpec taskSpec = ProtoConverters.getTaskSpecfromProto(request.getTaskSpec()); + if (taskSpec.getTaskAttemptID().getTaskID().getId() == 0 && + taskSpec.getTaskAttemptID().getId() == 0) { + LOG.info("Simulating Rejected work"); + throw new RejectedExecutionException( + "Simulating Rejected work for taskAttemptId=" + taskSpec.getTaskAttemptID()); + } else if (taskSpec.getTaskAttemptID().getTaskID().getId() == 1 && + taskSpec.getTaskAttemptID().getId() == 0) { + LOG.info("Simulating Task Setup Failure during launch"); + throw new TezException("Simulating Task Setup Failure during launch for taskAttemptId=" + + taskSpec.getTaskAttemptID()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java index 012e352..855f1b0 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java @@ -14,7 +14,6 @@ package org.apache.tez.service.impl; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; +import org.apache.tez.dag.api.TezException; import org.apache.tez.service.ContainerRunner; import org.apache.tez.shufflehandler.ShuffleHandler; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; @@ -108,14 +108,14 @@ public class TezTestService extends AbstractService implements ContainerRunner { @Override - public void queueContainer(RunContainerRequestProto request) throws IOException { + public void queueContainer(RunContainerRequestProto request) throws TezException { numSubmissions.incrementAndGet(); containerRunner.queueContainer(request); } @Override public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws - IOException { + TezException { numSubmissions.incrementAndGet(); containerRunner.submitWork(request); } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java index d7f8444..39d7156 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java @@ -30,11 +30,13 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; +import org.apache.tez.dag.api.TezException; import org.apache.tez.service.ContainerRunner; import org.apache.tez.service.TezTestServiceProtocolBlockingPB; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto; import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto; public class TezTestServiceProtocolServerImpl extends AbstractService @@ -61,20 +63,20 @@ public class TezTestServiceProtocolServerImpl extends AbstractService LOG.info("Received request: " + request); try { containerRunner.queueContainer(request); - } catch (IOException e) { + } catch (TezException e) { throw new ServiceException(e); } return RunContainerResponseProto.getDefaultInstance(); } @Override - public SubmitWorkResponseProto submitWork(RpcController controller, TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws + public SubmitWorkResponseProto submitWork(RpcController controller, SubmitWorkRequestProto request) throws ServiceException { LOG.info("Received submitWork request: " + request); try { containerRunner.submitWork(request); - } catch (IOException e) { - e.printStackTrace(); + } catch (TezException e) { + throw new ServiceException(e); } return SubmitWorkResponseProto.getDefaultInstance(); } http://git-wip-us.apache.org/repos/asf/tez/blob/7db5a8d0/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 0ec972b..b6a166d 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -27,16 +27,23 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinDataGen; import org.apache.tez.examples.JoinValidateConfigured; +import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.service.MiniTezTestServiceCluster; +import org.apache.tez.service.impl.ContainerRunnerImpl; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -283,6 +290,28 @@ public class TestExternalTezServices { PROPS_IN_AM, PROPS_REGULAR_CONTAINERS); } + @Test(timeout = 60000) + public void testErrorPropagation() throws TezException, InterruptedException, IOException { + runExceptionSimulation(); + } + + + + private void runExceptionSimulation() throws IOException, TezException, InterruptedException { + DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES); + Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()), + 3); + for (Map.Entry prop : PROPS_EXT_SERVICE_PUSH.entrySet()) { + v.setConf(prop.getKey(), prop.getValue()); + } + dag.addVertex(v); + DAGClient dagClient = sharedTezClient.submitDAG(dag); + DAGStatus dagStatus = dagClient.waitForCompletion(); + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + assertEquals(1, dagStatus.getDAGProgress().getFailedTaskAttemptCount()); + assertEquals(1, dagStatus.getDAGProgress().getKilledTaskAttemptCount()); + + } private void runJoinValidate(String name, int extExpectedCount, Map lhsProps, Map rhsProps,