Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DDD7517F22 for ; Mon, 11 May 2015 22:38:16 +0000 (UTC) Received: (qmail 58768 invoked by uid 500); 11 May 2015 22:38:16 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 58575 invoked by uid 500); 11 May 2015 22:38:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 58436 invoked by uid 99); 11 May 2015 22:38:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 May 2015 22:38:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5C70CE10A2; Mon, 11 May 2015 22:38:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Date: Mon, 11 May 2015 22:38:17 -0000 Message-Id: <5afa08daad814379844c8f9f9f1f1fb6@git.apache.org> In-Reply-To: <99a8b90e76934f268d3bb0e9a395e9fa@git.apache.org> References: <99a8b90e76934f268d3bb0e9a395e9fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/444836b3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/444836b3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/444836b3 Branch: refs/heads/trunk Commit: 444836b3dcd3ee28238af7b5e753d644e8095788 Parents: 1952f88 Author: Jason Lowe Authored: Mon May 11 22:37:35 2015 +0000 Committer: Jason Lowe Committed: Mon May 11 22:37:35 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/LocalContainerLauncher.java | 12 +- .../hadoop/mapreduce/v2/app/AppContext.java | 2 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 38 +- .../v2/app/TaskAttemptFinishingMonitor.java | 63 +++ .../v2/app/client/MRClientService.java | 2 +- .../v2/app/job/TaskAttemptStateInternal.java | 39 +- .../v2/app/job/event/TaskAttemptEventType.java | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 445 +++++++++++++++---- .../v2/app/launcher/ContainerLauncher.java | 8 +- .../v2/app/launcher/ContainerLauncherImpl.java | 11 +- .../mapred/TestTaskAttemptFinishingMonitor.java | 108 +++++ .../apache/hadoop/mapreduce/v2/app/MRApp.java | 16 + .../hadoop/mapreduce/v2/app/MockAppContext.java | 6 + .../hadoop/mapreduce/v2/app/TestFail.java | 2 + .../hadoop/mapreduce/v2/app/TestKill.java | 142 ++++-- .../mapreduce/v2/app/TestRuntimeEstimators.java | 5 + .../v2/app/job/impl/TestTaskAttempt.java | 263 ++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 10 +- .../src/main/resources/mapred-default.xml | 20 + .../hadoop/mapreduce/v2/hs/JobHistory.java | 6 + .../v2/TestSpeculativeExecutionWithMRApp.java | 6 +- 22 files changed, 1060 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 83d1704..f48f847 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -408,6 +408,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6353. Divide by zero error in MR AM when calculating available containers. (Anubhav Dhoot via kasha) + MAPREDUCE-5465. Tasks are often killed before they exit on their own + (Ming Ma via jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index ffc5326..52b3497 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements context.getEventHandler().handle( new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); - + } else if (event.getType() == EventType.CONTAINER_COMPLETED) { + LOG.debug("Container completed " + event.toString()); } else { LOG.warn("Ignoring unexpected event " + event.toString()); } @@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements } runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, (numReduceTasks > 0), localMapFiles); - + + // In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster + // as part of NM -> RM -> AM notification route. + // In uber mode, given the task run inside the MRAppMaster container, + // we have to simulate the notification. + context.getEventHandler().handle(new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } catch (RuntimeException re) { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 31e282a..4af11c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -67,4 +67,6 @@ public interface AppContext { boolean hasSuccessfullyUnregistered(); String getNMHostname(); + + TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index a5c9a25..95baaa3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -207,6 +207,14 @@ public class MRAppMaster extends CompositeService { private SpeculatorEventDispatcher speculatorEventDispatcher; private AMPreemptionPolicy preemptionPolicy; + // After a task attempt completes from TaskUmbilicalProtocol's point of view, + // it will be transitioned to finishing state. + // taskAttemptFinishingMonitor is just a timer for attempts in finishing + // state. If the attempt stays in finishing state for too long, + // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT + // event. + private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; + private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init protected UserGroupInformation currentUser; // Will be setup during init @@ -249,6 +257,12 @@ public class MRAppMaster extends CompositeService { logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } + protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor( + EventHandler eventHandler) { + TaskAttemptFinishingMonitor monitor = + new TaskAttemptFinishingMonitor(eventHandler); + return monitor; + } @Override protected void serviceInit(final Configuration conf) throws Exception { @@ -259,7 +273,11 @@ public class MRAppMaster extends CompositeService { initJobCredentialsAndUGI(conf); - context = new RunningAppContext(conf); + dispatcher = createDispatcher(); + addIfService(dispatcher); + taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler()); + addIfService(taskAttemptFinishingMonitor); + context = new RunningAppContext(conf, taskAttemptFinishingMonitor); // Job name is the same as the app name util we support DAG of jobs // for an app later @@ -326,9 +344,6 @@ public class MRAppMaster extends CompositeService { } if (errorHappenedShutDown) { - dispatcher = createDispatcher(); - addIfService(dispatcher); - NoopEventHandler eater = new NoopEventHandler(); //We do not have a JobEventDispatcher in this path dispatcher.register(JobEventType.class, eater); @@ -375,9 +390,6 @@ public class MRAppMaster extends CompositeService { } else { committer = createOutputCommitter(conf); - dispatcher = createDispatcher(); - addIfService(dispatcher); - //service to handle requests from JobClient clientService = createClientService(context); // Init ClientService separately so that we stop it separately, since this @@ -965,10 +977,14 @@ public class MRAppMaster extends CompositeService { private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; - public RunningAppContext(Configuration config) { + private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; + + public RunningAppContext(Configuration config, + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) { this.conf = config; this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); + this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; } @Override @@ -1053,6 +1069,12 @@ public class MRAppMaster extends CompositeService { public String getNMHostname() { return nmHost; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return taskAttemptFinishingMonitor; + } + } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java new file mode 100644 index 0000000..f603398 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * This class generates TA_TIMED_OUT if the task attempt stays in FINISHING + * state for too long. + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class TaskAttemptFinishingMonitor extends + AbstractLivelinessMonitor { + + private EventHandler eventHandler; + + public TaskAttemptFinishingMonitor(EventHandler eventHandler) { + super("TaskAttemptFinishingMonitor", new SystemClock()); + this.eventHandler = eventHandler; + } + + public void init(Configuration conf) { + super.init(conf); + int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT, + MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + int checkIntvl = conf.getInt( + MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, + MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT); + + setExpireInterval(expireIntvl); + setMonitorInterval(checkIntvl); + } + + @Override + protected void expire(TaskAttemptId id) { + eventHandler.handle( + new TaskAttemptEvent(id, + TaskAttemptEventType.TA_TIMED_OUT)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index ceb1dbf..d378b0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService { new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, - TaskAttemptEventType.TA_FAILMSG)); + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java index f6c3e57..5f17651 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java @@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal { UNASSIGNED, ASSIGNED, RUNNING, - COMMIT_PENDING, - SUCCESS_CONTAINER_CLEANUP, - SUCCEEDED, + COMMIT_PENDING, + + // Transition into SUCCESS_FINISHING_CONTAINER + // After the attempt finishes successfully from + // TaskUmbilicalProtocol's point of view, it will transition to + // SUCCESS_FINISHING_CONTAINER state. That will give a chance for the + // container to exit by itself. In the transition, + // the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that + // from job point of view, the task is considered succeeded. + + // Transition out of SUCCESS_FINISHING_CONTAINER + // The attempt will transition from SUCCESS_FINISHING_CONTAINER to + // SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit + // notification within TASK_EXIT_TIMEOUT; + // Or it will transition to SUCCEEDED if it receives container exit + // notification from YARN. + SUCCESS_FINISHING_CONTAINER, + + // Transition into FAIL_FINISHING_CONTAINER + // After the attempt fails from + // TaskUmbilicalProtocol's point of view, it will transition to + // FAIL_FINISHING_CONTAINER state. That will give a chance for the container + // to exit by itself. In the transition, + // the attempt will notify the task via T_ATTEMPT_FAILED so that + // from job point of view, the task is considered failed. + + // Transition out of FAIL_FINISHING_CONTAINER + // The attempt will transition from FAIL_FINISHING_CONTAINER to + // FAIL_CONTAINER_CLEANUP if it doesn't receive container exit + // notification within TASK_EXIT_TIMEOUT; + // Or it will transition to FAILED if it receives container exit + // notification from YARN. + FAIL_FINISHING_CONTAINER, + + SUCCESS_CONTAINER_CLEANUP, + SUCCEEDED, FAIL_CONTAINER_CLEANUP, FAIL_TASK_CLEANUP, FAILED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java index 1f05ac3..61de032 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java @@ -49,6 +49,9 @@ public enum TaskAttemptEventType { TA_TIMED_OUT, TA_PREEMPTED, + //Producer:Client + TA_FAILMSG_BY_CLIENT, + //Producer:TaskCleaner TA_CLEANUP_DONE, http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index f4b434b..7e82df2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements private Locality locality; private Avataar avataar; - private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = - new CleanupContainerTransition(); + private static final CleanupContainerTransition + CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); + private static final MoveContainerToSucceededFinishingTransition + SUCCEEDED_FINISHING_TRANSITION = + new MoveContainerToSucceededFinishingTransition(); + private static final MoveContainerToFailedFinishingTransition + FAILED_FINISHING_TRANSITION = + new MoveContainerToFailedFinishingTransition(); + private static final ExitFinishingOnTimeoutTransition + FINISHING_ON_TIMEOUT_TRANSITION = + new ExitFinishingOnTimeoutTransition(); + + private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION = + new FinalizeFailedTransition(); private static final DiagnosticInformationUpdater DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION @@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE); private static final StateMachineFactory @@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new KilledTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, - TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition()) .addTransition(TaskAttemptStateInternal.NEW, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_RECOVER, new RecoverTransition()) .addTransition(TaskAttemptStateInternal.NEW, - TaskAttemptStateInternal.NEW, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + TaskAttemptStateInternal.NEW, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, @@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements new ContainerAssignedTransition()) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( - TaskAttemptStateInternal.KILLED, true)) + TaskAttemptStateInternal.KILLED, true)) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, - TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition( TaskAttemptStateInternal.FAILED, true)) .addTransition(TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the ASSIGNED state. .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, @@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false)) .addTransition(TaskAttemptStateInternal.ASSIGNED, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptStateInternal.ASSIGNED, + .addTransition(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + CLEANUP_CONTAINER_TRANSITION) // Transitions from RUNNING state. .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, @@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) - // If no commit is required, task directly goes to success + // If no commit is required, task goes to finishing state + // This will give a chance for the container to exit by itself .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION) // If commit is required, task goes through commit pending state. .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) // Failure handling while RUNNING .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) + .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION) //for handling container exit without sending the done or fail msg .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) // Timeout handling while RUNNING .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, @@ -301,12 +323,97 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) // Kill handling .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition()) + // Transitions from SUCCESS_FINISHING_CONTAINER state + // When the container exits by itself, the notification of container + // completed event will be routed via NM -> RM -> AM. + // After MRAppMaster gets notification from RM, it will generate + // TA_CONTAINER_COMPLETED event. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + new ExitFinishingOnContainerCompletedTransition()) + // Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to + // SUCCESS_FINISHING_CONTAINER, it is possible to receive the event + // TA_CONTAINER_CLEANED in the following scenario. + // 1. It is the last task for the job. + // 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job. + // 3. Job will be marked completed. + // 4. As part of MRAppMaster's shutdown, all containers will be killed. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, + new ExitFinishingOnContainerCleanedupTransition()) + // The client wants to kill the task. Given the task is in finishing + // state, it could go to succeeded state or killed state. If it is a + // reducer, it will go to succeeded state; + // otherwise, it goes to killed state. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP), + TaskAttemptEventType.TA_KILL, + new KilledAfterSucceededFinishingTransition()) + // The attempt stays in finishing state for too long + // Let us clean up the container + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION) + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + // ignore-able events + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptEventType.TA_UPDATE, + TaskAttemptEventType.TA_DONE, + TaskAttemptEventType.TA_COMMIT_PENDING, + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)) + + // Transitions from FAIL_FINISHING_CONTAINER state + // When the container exits by itself, the notification of container + // completed event will be routed via NM -> RM -> AM. + // After MRAppMaster gets notification from RM, it will generate + // TA_CONTAINER_COMPLETED event. + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + new ExitFinishingOnContainerCompletedTransition()) + // Given TA notifies task T_ATTEMPT_FAILED when it transitions to + // FAIL_FINISHING_CONTAINER, it is possible to receive the event + // TA_CONTAINER_CLEANED in the following scenario. + // 1. It is the last task attempt for the task. + // 2. After the task receives T_ATTEMPT_FAILED, it will notify job. + // 3. Job will be marked failed. + // 4. As part of MRAppMaster's shutdown, all containers will be killed. + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, + new ExitFinishingOnContainerCleanedupTransition()) + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION) + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + // ignore-able events + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_UPDATE, + TaskAttemptEventType.TA_DONE, + TaskAttemptEventType.TA_COMMIT_PENDING, + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)) + // Transitions from COMMIT_PENDING state .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, @@ -316,22 +423,27 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) // if container killed by AM shutting down .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + CLEANUP_CONTAINER_TRANSITION) + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) @@ -348,8 +460,8 @@ public abstract class TaskAttemptImpl implements // Transitions from SUCCESS_CONTAINER_CLEANUP state // kill and cleanup the container .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, - new SucceededTransition()) + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_CLEANED) .addTransition( TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, @@ -360,6 +472,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) @@ -383,6 +496,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT)) // Transitions from KILL_CONTAINER_CLEANUP @@ -405,6 +519,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT)) // Transitions from FAIL_TASK_CLEANUP @@ -425,6 +540,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_CONTAINER_CLEANED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, @@ -447,6 +563,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_PREEMPTED, // Container launch events can arrive late @@ -460,7 +577,7 @@ public abstract class TaskAttemptImpl implements new TooManyFetchFailureTransition()) .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED), - TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_KILL, new KilledAfterSuccessTransition()) .addTransition( TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, @@ -470,6 +587,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + // TaskAttemptFinishingMonitor might time out the attempt right + // after the attempt receives TA_CONTAINER_COMPLETED. + TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) @@ -1213,21 +1334,21 @@ public abstract class TaskAttemptImpl implements return TaskAttemptState.STARTING; case COMMIT_PENDING: return TaskAttemptState.COMMIT_PENDING; - case FAILED: - return TaskAttemptState.FAILED; - case KILLED: - return TaskAttemptState.KILLED; - // All CLEANUP states considered as RUNNING since events have not gone out - // to the Task yet. May be possible to consider them as a Finished state. case FAIL_CONTAINER_CLEANUP: case FAIL_TASK_CLEANUP: + case FAIL_FINISHING_CONTAINER: + case FAILED: + return TaskAttemptState.FAILED; case KILL_CONTAINER_CLEANUP: case KILL_TASK_CLEANUP: - case SUCCESS_CONTAINER_CLEANUP: + case KILLED: + return TaskAttemptState.KILLED; case RUNNING: return TaskAttemptState.RUNNING; case NEW: return TaskAttemptState.NEW; + case SUCCESS_CONTAINER_CLEANUP: + case SUCCESS_FINISHING_CONTAINER: case SUCCEEDED: return TaskAttemptState.SUCCEEDED; default: @@ -1429,6 +1550,15 @@ public abstract class TaskAttemptImpl implements } } + private static void finalizeProgress(TaskAttemptImpl taskAttempt) { + // unregister it to TaskAttemptListener so that it stops listening + taskAttempt.taskAttemptListener.unregister( + taskAttempt.attemptId, taskAttempt.jvmID); + taskAttempt.reportedStatus.progress = 1.0f; + taskAttempt.updateProgressSplits(); + } + + static class RequestContainerTransition implements SingleArcTransition { private final boolean rescheduled; @@ -1661,53 +1791,66 @@ public abstract class TaskAttemptImpl implements } } - private static class SucceededTransition implements + /** + * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER + * state upon receiving TA_CONTAINER_COMPLETED event + */ + private static class ExitFinishingOnContainerCompletedTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @Override - public void transition(TaskAttemptImpl taskAttempt, + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + sendContainerCompleted(taskAttempt); + } + } + + private static class ExitFinishingOnContainerCleanedupTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - //set the finish time - taskAttempt.setFinishTime(); - taskAttempt.eventHandler.handle( - createJobCounterUpdateEventTASucceeded(taskAttempt)); - taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_SUCCEEDED)); - taskAttempt.eventHandler.handle - (new SpeculatorEvent - (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); - } + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + } } private static class FailedTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @Override - public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { // set the finish time taskAttempt.setFinishTime(); - - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.FAILED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not - // handling failed map/reduce events. - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + notifyTaskAttemptFailed(taskAttempt); } } + private static class FinalizeFailedTransition extends FailedTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + sendContainerCompleted(taskAttempt); + super.transition(taskAttempt, event); + } + } + + @SuppressWarnings("unchecked") + private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) { + taskAttempt.eventHandler.handle(new ContainerLauncherEvent( + taskAttempt.attemptId, + taskAttempt.container.getId(), StringInterner + .weakIntern(taskAttempt.container.getNodeId().toString()), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_COMPLETED)); + } + private static class RecoverTransition implements MultipleArcTransition { @@ -1832,6 +1975,35 @@ public abstract class TaskAttemptImpl implements } } + private static class KilledAfterSucceededFinishingTransition + implements MultipleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + sendContainerCleanup(taskAttempt, event); + if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { + // after a reduce task has succeeded, its outputs are in safe in HDFS. + // logically such a task should not be killed. we only come here when + // there is a race condition in the event queue. E.g. some logic sends + // a kill request to this attempt when the successful completion event + // for this task is already in the event queue. so the kill event will + // get executed immediately after the attempt is marked successful and + // result in this transition being exercised. + // ignore this for reduce tasks + LOG.info("Ignoring killed event for successful reduce task attempt" + + taskAttempt.getID().toString()); + return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; + } else { + return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP; + } + } + } + private static class KilledTransition implements SingleArcTransition { @@ -1887,6 +2059,31 @@ public abstract class TaskAttemptImpl implements } } + /** + * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER + * state upon receiving TA_TIMED_OUT event + */ + private static class ExitFinishingOnTimeoutTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + // The attempt stays in finishing state for too long + String msg = "Task attempt " + taskAttempt.getID() + " is done from " + + "TaskUmbilicalProtocol's point of view. However, it stays in " + + "finishing state for too long"; + LOG.warn(msg); + taskAttempt.addDiagnosticInfo(msg); + sendContainerCleanup(taskAttempt, event); + } + } + + /** + * Finish and clean up the container + */ private static class CleanupContainerTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @@ -1894,27 +2091,103 @@ public abstract class TaskAttemptImpl implements public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // unregister it to TaskAttemptListener so that it stops listening - // for it - taskAttempt.taskAttemptListener.unregister( - taskAttempt.attemptId, taskAttempt.jvmID); + // for it. + finalizeProgress(taskAttempt); + sendContainerCleanup(taskAttempt, event); + } + } - if (event instanceof TaskAttemptKillEvent) { - taskAttempt.addDiagnosticInfo( - ((TaskAttemptKillEvent) event).getMessage()); - } + @SuppressWarnings("unchecked") + private static void sendContainerCleanup(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); + } + //send the cleanup event to containerLauncher + taskAttempt.eventHandler.handle(new ContainerLauncherEvent( + taskAttempt.attemptId, + taskAttempt.container.getId(), StringInterner + .weakIntern(taskAttempt.container.getNodeId().toString()), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); + } + + /** + * Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event + */ + private static class MoveContainerToSucceededFinishingTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + + // register it to finishing state + taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( + taskAttempt.attemptId); + + // set the finish time + taskAttempt.setFinishTime(); + + // notify job history + taskAttempt.eventHandler.handle( + createJobCounterUpdateEventTASucceeded(taskAttempt)); + taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); + + //notify the task even though the container might not have exited yet. + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, + TaskEventType.T_ATTEMPT_SUCCEEDED)); + taskAttempt.eventHandler.handle + (new SpeculatorEvent + (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); - taskAttempt.reportedStatus.progress = 1.0f; - taskAttempt.updateProgressSplits(); - //send the cleanup event to containerLauncher - taskAttempt.eventHandler.handle(new ContainerLauncherEvent( - taskAttempt.attemptId, - taskAttempt.container.getId(), StringInterner - .weakIntern(taskAttempt.container.getNodeId().toString()), - taskAttempt.container.getContainerToken(), - ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); } } + /** + * Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event + */ + private static class MoveContainerToFailedFinishingTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + // register it to finishing state + taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( + taskAttempt.attemptId); + notifyTaskAttemptFailed(taskAttempt); + } + } + + @SuppressWarnings("unchecked") + private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + // set the finish time + taskAttempt.setFinishTime(); + + if (taskAttempt.getLaunchTime() != 0) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); + // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not + // handling failed map/reduce events. + }else { + LOG.debug("Not generating HistoryFinish event since start event not " + + "generated for taskAttempt: " + taskAttempt.getID()); + } + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + + } + private void addDiagnosticInfo(String diag) { if (diag != null && !diag.equals("")) { diagnostics.add(diag); http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java index 40ecdb2..82360f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java @@ -27,7 +27,13 @@ public interface ContainerLauncher enum EventType { CONTAINER_REMOTE_LAUNCH, - CONTAINER_REMOTE_CLEANUP + CONTAINER_REMOTE_CLEANUP, + // When TaskAttempt receives TA_CONTAINER_COMPLETED, + // it will notify ContainerLauncher so that the container can be removed + // from ContainerLauncher's launched containers list + // Otherwise, ContainerLauncher will try to stop the containers as part of + // serviceStop. + CONTAINER_COMPLETED } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 9c1125d..a7e966c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements public synchronized boolean isCompletelyDone() { return state == ContainerState.DONE || state == ContainerState.FAILED; } - + + public synchronized void done() { + state = ContainerState.DONE; + } + @SuppressWarnings("unchecked") public synchronized void launch(ContainerRemoteLaunchEvent event) { LOG.info("Launching " + taskAttemptID); @@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements case CONTAINER_REMOTE_CLEANUP: c.kill(); break; + + case CONTAINER_COMPLETED: + c.done(); + break; + } removeContainerIfDone(containerID); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java new file mode 100644 index 0000000..800f0e2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java @@ -0,0 +1,108 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapred; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestTaskAttemptFinishingMonitor { + + @Test + public void testFinshingAttemptTimeout() + throws IOException, InterruptedException { + SystemClock clock = new SystemClock(); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); + + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = + new TaskAttemptFinishingMonitor(eventHandler); + taskAttemptFinishingMonitor.init(conf); + taskAttemptFinishingMonitor.start(); + + when(appCtx.getEventHandler()).thenReturn(eventHandler); + when(appCtx.getNMHostname()).thenReturn("0.0.0.0"); + when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn( + taskAttemptFinishingMonitor); + when(appCtx.getClock()).thenReturn(clock); + + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, policy); + + listener.init(conf); + listener.start(); + + JobId jid = MRBuilderUtils.newJobId(12345, 1, 1); + TaskId tid = MRBuilderUtils.newTaskId(jid, 0, + org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); + appCtx.getTaskAttemptFinishingMonitor().register(attemptId); + int check = 0; + while ( !eventHandler.timedOut && check++ < 10 ) { + Thread.sleep(100); + } + taskAttemptFinishingMonitor.stop(); + + assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut); + + } + + public static class MockEventHandler implements EventHandler { + public boolean timedOut = false; + + @Override + public void handle(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event); + if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) { + timedOut = true; + } + } + } + }; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 58db925..4fe4c44 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -482,6 +482,20 @@ public class MRApp extends MRAppMaster { } @Override + protected TaskAttemptFinishingMonitor + createTaskAttemptFinishingMonitor( + EventHandler eventHandler) { + return new TaskAttemptFinishingMonitor(eventHandler) { + @Override + public synchronized void register(TaskAttemptId attemptID) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } + }; + } + + @Override protected TaskAttemptListener createTaskAttemptListener( AppContext context, AMPreemptionPolicy policy) { return new TaskAttemptListener(){ @@ -541,6 +555,8 @@ public class MRApp extends MRAppMaster { new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; + case CONTAINER_COMPLETED: + break; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index a900241..e690f3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -148,4 +148,10 @@ public class MockAppContext implements AppContext { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 4a36938..4d3f6f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -223,6 +223,8 @@ public class TestFail { new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; + case CONTAINER_COMPLETED: + super.handle(event); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index c33bd4d..aae591e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -159,7 +159,7 @@ public class TestKill { super.dispatch(new TaskAttemptEvent(taID, TaskAttemptEventType.TA_DONE)); super.dispatch(new TaskAttemptEvent(taID, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); super.dispatch(new TaskTAttemptEvent(taID, TaskEventType.T_ATTEMPT_SUCCEEDED)); this.cachedKillEvent = killEvent; @@ -211,40 +211,9 @@ public class TestKill { app.getContext().getEventHandler() .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); - } - - static class MyAsyncDispatch extends AsyncDispatcher { - private CountDownLatch latch; - private TaskAttemptEventType attemptEventTypeToWait; - MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { - super(); - this.latch = latch; - this.attemptEventTypeToWait = attemptEventTypeToWait; - } - - @Override - protected void dispatch(Event event) { - if (event instanceof TaskAttemptEvent) { - TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; - TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); - if (attemptEvent.getType() == this.attemptEventTypeToWait - && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - super.dispatch(event); - } + app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED); } - // This is to test a race condition where JobEventType.JOB_KILL is generated - // right after TaskAttemptEventType.TA_DONE is generated. - // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED - // and T_ATTEMPT_KILLED from the same attempt. @Test public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { CountDownLatch latch = new CountDownLatch(1); @@ -269,15 +238,12 @@ public class TestKill { TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); - // The order in the dispatch event queue, from the oldest to the newest + // The order in the dispatch event queue, from first to last // TA_DONE - // JOB_KILL - // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) - // T_KILL ( from JOB_KILL's handling ) - // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) - // TA_KILL ( from T_KILL's handling ) - // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling ) - // T_ATTEMPT_KILLED ( from TA_KILL's handling ) + // JobEventType.JOB_KILL + // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling ) + // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling ) + // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling ) // Finish map app.getContext().getEventHandler().handle( @@ -295,6 +261,100 @@ public class TestKill { app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); } + + @Test + public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL); + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // The order in the dispatch event queue, from first to last + // JobEventType.JOB_KILL + // TA_DONE + // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling ) + // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling ) + // TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling ) + // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling ) + // TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling ) + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //unblock + latch.countDown(); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + + static class MyAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private TaskAttemptEventType attemptEventTypeToWait; + private JobEventType jobEventTypeToWait; + MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { + super(); + this.latch = latch; + this.attemptEventTypeToWait = attemptEventTypeToWait; + } + + MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) { + super(); + this.latch = latch; + this.jobEventTypeToWait = jobEventTypeToWait; + } + + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; + TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); + if (attemptEvent.getType() == this.attemptEventTypeToWait + && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } else if ( event instanceof JobEvent) { + JobEvent jobEvent = (JobEvent) event; + if (jobEvent.getType() == this.jobEventTypeToWait) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + super.dispatch(event); + } + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/444836b3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 69f2709..475cd1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -884,5 +884,10 @@ public class TestRuntimeEstimators { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } } }