Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C678718B04 for ; Fri, 18 Sep 2015 18:13:25 +0000 (UTC) Received: (qmail 34698 invoked by uid 500); 18 Sep 2015 18:13:21 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 34436 invoked by uid 500); 18 Sep 2015 18:13:21 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 34063 invoked by uid 99); 18 Sep 2015 18:13:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 18:13:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45B73DFBD1; Fri, 18 Sep 2015 18:13:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Fri, 18 Sep 2015 18:13:28 -0000 Message-Id: <8709a4a735ef400ea9aafb9a45a77f22@git.apache.org> In-Reply-To: <51bb9abd63de4957b2788d6d3f5552d7@git.apache.org> References: <51bb9abd63de4957b2788d6d3f5552d7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/34] hadoop git commit: MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee4ee6af Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee4ee6af Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee4ee6af Branch: refs/heads/YARN-1197 Commit: ee4ee6af6a5a6299d27462adb6944206039bbbae Parents: 9eee975 Author: Jason Lowe Authored: Thu Sep 17 21:37:39 2015 +0000 Committer: Jason Lowe Committed: Thu Sep 17 21:37:39 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 92 +++++------ .../apache/hadoop/mapreduce/v2/app/MRApp.java | 11 +- .../v2/app/job/impl/TestTaskAttempt.java | 154 +++++++++++++++++++ 4 files changed, 213 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6cf7abb..cd84a34 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -573,6 +573,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt (Chang Li via jlowe) + MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can + disappear (Chang Li via jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 77a7555..a7becdb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1484,6 +1484,19 @@ public abstract class TaskAttemptImpl implements return tauce; } + private static void + sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) { + TaskAttemptContainerLaunchedEvent event; + taskAttempt.launchTime = taskAttempt.clock.getTime(); + + InetSocketAddress nodeHttpInetAddr = + NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); + taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); + taskAttempt.httpPort = nodeHttpInetAddr.getPort(); + taskAttempt.sendLaunchedEvents(); + } + + @SuppressWarnings("unchecked") private void sendLaunchedEvents() { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() @@ -1681,6 +1694,9 @@ public abstract class TaskAttemptImpl implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); @@ -1715,23 +1731,19 @@ public abstract class TaskAttemptImpl implements default: LOG.error("Task final state is not FAILED or KILLED: " + finalState); } - if (taskAttempt.getLaunchTime() != 0) { - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - finalState); - if(finalState == TaskAttemptStateInternal.FAILED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - } else if(finalState == TaskAttemptStateInternal.KILLED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - } - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - } else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + finalState); + if(finalState == TaskAttemptStateInternal.FAILED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + } else if(finalState == TaskAttemptStateInternal.KILLED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); } + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); } } @@ -2023,27 +2035,25 @@ public abstract class TaskAttemptImpl implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.KILLED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } + + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.KILLED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } -// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); @@ -2178,23 +2188,19 @@ public abstract class TaskAttemptImpl implements @SuppressWarnings("unchecked") private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } // set the finish time taskAttempt.setFinishTime(); + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.FAILED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not - // handling failed map/reduce events. - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index b51adf2..f0c10d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -544,10 +544,7 @@ public class MRApp extends MRAppMaster { public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - getContext().getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), - shufflePort)); - + containerLaunched(event.getTaskAttemptID(), shufflePort); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: @@ -561,6 +558,12 @@ public class MRApp extends MRAppMaster { } } + protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) { + getContext().getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, + shufflePort)); + } + protected void attemptLaunched(TaskAttemptId attemptID) { if (autoComplete) { // send the done event http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index a88a935..6b4656a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -115,6 +115,69 @@ public class TestTaskAttempt{ } @Test + public void testMRAppHistoryForTAFailedInAssigned() throws Exception { + // test TA_CONTAINER_LAUNCH_FAILED for map + FailingAttemptsDuringAssignedMRApp app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_LAUNCH_FAILED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG_BY_CLIENT for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG_BY_CLIENT for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT); + testTaskAttemptAssignedFailHistory(app); + + // test TA_KILL for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + + // test TA_KILL for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + } + + @Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false); @@ -301,6 +364,31 @@ public class TestTaskAttempt{ report.getTaskAttemptState()); } + private void testTaskAttemptAssignedFailHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.FAILED); + Map tasks = job.getTasks(); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent()); + } + + private void testTaskAttemptAssignedKilledHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Map tasks = job.getTasks(); + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + Map attempts = task.getAttempts(); + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.KILLED); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent()); + } + static class FailingAttemptsMRApp extends MRApp { FailingAttemptsMRApp(int maps, int reduces) { super(maps, reduces, true, "FailingAttemptsMRApp", true); @@ -331,6 +419,72 @@ public class TestTaskAttempt{ } } + static class FailingAttemptsDuringAssignedMRApp extends MRApp { + FailingAttemptsDuringAssignedMRApp(int maps, int reduces, + TaskAttemptEventType event) { + super(maps, reduces, true, "FailingAttemptsMRApp", true); + sendFailEvent = event; + } + + TaskAttemptEventType sendFailEvent; + + @Override + protected void containerLaunched(TaskAttemptId attemptID, + int shufflePort) { + //do nothing, not send TA_CONTAINER_LAUNCHED event + } + + @Override + protected void attemptLaunched(TaskAttemptId attemptID) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, sendFailEvent)); + } + + private boolean receiveTaStartJHEvent = false; + private boolean receiveTaFailedJHEvent = false; + private boolean receiveTaKilledJHEvent = false; + + public boolean getTaStartJHEvent(){ + return receiveTaStartJHEvent; + } + + public boolean getTaFailedJHEvent(){ + return receiveTaFailedJHEvent; + } + + public boolean getTaKilledJHEvent(){ + return receiveTaKilledJHEvent; + } + + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new EventHandler() { + @Override + public void handle(JobHistoryEvent event) { + if (event.getType() == org.apache.hadoop.mapreduce.jobhistory. + EventType.MAP_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } + } + }; + } + } + @Test public void testLaunchFailedWhileKilling() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);