Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5E2A718558 for ; Fri, 14 Aug 2015 19:40:45 +0000 (UTC) Received: (qmail 74554 invoked by uid 500); 14 Aug 2015 19:40:45 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 74486 invoked by uid 500); 14 Aug 2015 19:40:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 74477 invoked by uid 99); 14 Aug 2015 19:40:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 19:40:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E1AB9E0435; Fri, 14 Aug 2015 19:40:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Message-Id: <9348dc38db5a43ff9c20c6b6a7740e2e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-5817. Mappers get rescheduled on node transition even after all reducers are completed. (Sangjin Lee via kasha) (cherry picked from commit 27d24f96ab8d17e839a1ef0d7076efc78d28724a) Date: Fri, 14 Aug 2015 19:40:44 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 17256e2ae -> b82616817 MAPREDUCE-5817. Mappers get rescheduled on node transition even after all reducers are completed. (Sangjin Lee via kasha) (cherry picked from commit 27d24f96ab8d17e839a1ef0d7076efc78d28724a) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8261681 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8261681 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8261681 Branch: refs/heads/branch-2 Commit: b826168173c3386738acc12a5d62577f12aa06e9 Parents: 17256e2 Author: Karthik Kambatla Authored: Fri Aug 14 12:29:50 2015 -0700 Committer: Karthik Kambatla Committed: Fri Aug 14 12:40:38 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 38 ++++-- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 130 ++++++++++++++++++- 3 files changed, 156 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8261681/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 66a9aac..2113c76 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -283,6 +283,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu) + MAPREDUCE-5817. Mappers get rescheduled on node transition even after all + reducers are completed. (Sangjin Lee via kasha) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8261681/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 8623dc5..ab94960 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** Implementation of Job interface. Maintains the state machines of Job. @@ -1328,15 +1329,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks - List taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); - if(taskAttemptIdList != null) { - String mesg = "TaskAttempt killed because it ran on unusable node " - + nodeId; - for(TaskAttemptId id : taskAttemptIdList) { - if(TaskType.MAP == id.getTaskId().getTaskType()) { - // reschedule only map tasks because their outputs maybe unusable - LOG.info(mesg + ". AttemptId:" + id); - eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + // do this only if the job is still in the running state and there are + // running reducers + if (getInternalState() == JobStateInternal.RUNNING && + !allReducersComplete()) { + List taskAttemptIdList = + nodesToSucceededTaskAttempts.get(nodeId); + if (taskAttemptIdList != null) { + String mesg = "TaskAttempt killed because it ran on unusable node " + + nodeId; + for (TaskAttemptId id : taskAttemptIdList) { + if (TaskType.MAP == id.getTaskId().getTaskType()) { + // reschedule only map tasks because their outputs maybe unusable + LOG.info(mesg + ". AttemptId:" + id); + eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + } } } } @@ -1344,6 +1351,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // RMContainerAllocator } + private boolean allReducersComplete() { + return numReduceTasks == 0 || numReduceTasks == getCompletedReduces(); + } + /* private int getBlockSize() { String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR); @@ -2082,13 +2093,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } + @VisibleForTesting + void decrementSucceededMapperCount() { + completedTaskCount--; + succeededMapTaskCount--; + } + private static class MapTaskRescheduledTransition implements SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { //succeeded map task is restarted back - job.completedTaskCount--; - job.succeededMapTaskCount--; + job.decrementSucceededMapperCount(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8261681/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index cae9663..2af4380 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -19,15 +19,21 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -35,10 +41,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.jobhistory.EventType; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRConfig; @@ -46,10 +48,17 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -64,7 +73,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; @@ -76,6 +89,9 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -508,6 +524,112 @@ public class TestJobImpl { commitHandler.stop(); } + @Test(timeout=20000) + public void testUnusableNodeTransition() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); + // add a special task event handler to put the task back to running in case + // of task rescheduling/killing + EventHandler taskAttemptEventHandler = + new EventHandler() { + @Override + public void handle(TaskAttemptEvent event) { + if (event.getType() == TaskAttemptEventType.TA_KILL) { + job.decrementSucceededMapperCount(); + } + } + }; + dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); + + // replace the tasks with spied versions to return the right attempts + Map spiedTasks = new HashMap(); + List nodeReports = new ArrayList(); + Map nodeReportsToTaskIds = + new HashMap(); + for (Map.Entry e: job.tasks.entrySet()) { + TaskId taskId = e.getKey(); + Task task = e.getValue(); + if (taskId.getTaskType() == TaskType.MAP) { + // add an attempt to the task to simulate nodes + NodeId nodeId = mock(NodeId.class); + TaskAttempt attempt = mock(TaskAttempt.class); + when(attempt.getNodeId()).thenReturn(nodeId); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + when(attempt.getID()).thenReturn(attemptId); + // create a spied task + Task spied = spy(task); + doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); + spiedTasks.put(taskId, spied); + + // create a NodeReport based on the node id + NodeReport report = mock(NodeReport.class); + when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY); + when(report.getNodeId()).thenReturn(nodeId); + nodeReports.add(report); + nodeReportsToTaskIds.put(report, taskId); + } + } + // replace the tasks with the spied tasks + job.tasks.putAll(spiedTasks); + + // complete all mappers first + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.MAP) { + // generate a task attempt completed event first to populate the + // nodes-to-succeeded-attempts map + TaskAttemptCompletionEvent tce = + Records.newRecord(TaskAttemptCompletionEvent.class); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + tce.setAttemptId(attemptId); + tce.setStatus(TaskAttemptCompletionEventStatus.SUCCEEDED); + job.handle(new JobTaskAttemptCompletedEvent(tce)); + // complete the task itself + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + Assert.assertEquals(JobState.RUNNING, job.getState()); + } + } + + // add an event for a node transition + NodeReport firstMapperNodeReport = nodeReports.get(0); + NodeReport secondMapperNodeReport = nodeReports.get(1); + job.handle(new JobUpdatedNodesEvent(job.getID(), + Collections.singletonList(firstMapperNodeReport))); + // complete the reducer + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.REDUCE) { + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + } + } + // add another event for a node transition for the other mapper + // this should not trigger rescheduling + job.handle(new JobUpdatedNodesEvent(job.getID(), + Collections.singletonList(secondMapperNodeReport))); + // complete the first mapper that was rescheduled + TaskId firstMapper = nodeReportsToTaskIds.get(firstMapperNodeReport); + job.handle(new JobTaskEvent(firstMapper, TaskState.SUCCEEDED)); + // verify the state is moving to committing + assertJobState(job, JobStateInternal.COMMITTING); + + // let the committer complete and verify the job succeeds + syncBarrier.await(); + assertJobState(job, JobStateInternal.SUCCEEDED); + + dispatcher.stop(); + commitHandler.stop(); + } + public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); t.testJobNoTasks();