Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53E1310695 for ; Mon, 15 Jul 2013 22:25:01 +0000 (UTC) Received: (qmail 69577 invoked by uid 500); 15 Jul 2013 22:25:01 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 69513 invoked by uid 500); 15 Jul 2013 22:25:00 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 69505 invoked by uid 99); 15 Jul 2013 22:25:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jul 2013 22:25:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jul 2013 22:24:58 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 40B6A23889F1; Mon, 15 Jul 2013 22:24:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1503499 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/... Date: Mon, 15 Jul 2013 22:24:38 -0000 To: mapreduce-commits@hadoop.apache.org From: jlowe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130715222438.40B6A23889F1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlowe Date: Mon Jul 15 22:24:37 2013 New Revision: 1503499 URL: http://svn.apache.org/r1503499 Log: MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1503499&r1=1503498&r2=1503499&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jul 15 22:24:37 2013 @@ -158,6 +158,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl (Devaraj K via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -1220,6 +1223,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir (Devaraj K via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1503499&r1=1503498&r2=1503499&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Mon Jul 15 22:24:37 2013 @@ -25,6 +25,7 @@ public enum JobStateInternal { RUNNING, COMMITTING, SUCCEEDED, + FAIL_WAIT, FAIL_ABORT, FAILED, KILL_WAIT, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1503499&r1=1503498&r2=1503499&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Mon Jul 15 22:24:37 2013 @@ -44,6 +44,7 @@ public enum JobEventType { //Producer:Job JOB_COMPLETED, + JOB_FAIL_WAIT_TIMEDOUT, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1503499&r1=1503498&r2=1503499&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Jul 15 22:24:37 2013 @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -313,7 +316,8 @@ public class JobImpl implements org.apac .addTransition (JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING, - JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), + JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition @@ -424,7 +428,37 @@ public class JobImpl implements org.apac JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_MAP_TASK_RESCHEDULED)) - // Transitions from FAIL_ABORT state + // Transitions from FAIL_WAIT state + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), + JobEventType.JOB_TASK_COMPLETED, + new JobFailWaitTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, + new JobFailWaitTimedOutTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, + JobEventType.JOB_KILL, + new KilledDuringAbortTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able events + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + EnumSet.of(JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) + + //Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -451,7 +485,8 @@ public class JobImpl implements org.apac JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_AM_REBOOT)) + JobEventType.JOB_AM_REBOOT, + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -602,6 +637,10 @@ public class JobImpl implements org.apac private JobStateInternal forcedState = null; + //Executor used for running future tasks. Setting thread pool size to 1 + private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + private ScheduledFuture failWaitTriggerScheduledFuture; + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -962,6 +1001,7 @@ public class JobImpl implements org.apac case SETUP: case COMMITTING: return JobState.RUNNING; + case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; case REBOOT: @@ -1565,7 +1605,43 @@ public class JobImpl implements org.apac job.unsuccessfulFinish(finalState); } } - + + //This transition happens when a job is to be failed. It waits for all the + //tasks to finish / be killed. + private static class JobFailWaitTransition + implements MultipleArcTransition { + @Override + public JobStateInternal transition(JobImpl job, JobEvent event) { + if(!job.failWaitTriggerScheduledFuture.isCancelled()) { + for(Task task: job.tasks.values()) { + if(!task.isFinished()) { + return JobStateInternal.FAIL_WAIT; + } + } + } + //Finished waiting. All tasks finished / were killed + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + return JobStateInternal.FAIL_ABORT; + } + } + + //This transition happens when a job to be failed times out while waiting on + //tasks that had been sent the KILL signal. It is triggered by a + //ScheduledFuture task queued in the executor. + private static class JobFailWaitTimedOutTransition + implements SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + + " Going to fail job anyway"); + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + // JobFinishedEvent triggers the move of the history file out of the staging // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. @@ -1798,6 +1874,23 @@ public class JobImpl implements org.apac return checkJobAfterTaskCompletion(job); } + //This class is used to queue a ScheduledFuture to send an event to a job + //after some delay. This can be used to wait for maximum amount of time + //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for + //all tasks to be killed. + static class TriggerScheduledFuture implements Runnable { + JobEvent toSend; + JobImpl job; + TriggerScheduledFuture(JobImpl job, JobEvent toSend) { + this.toSend = toSend; + this.job = job; + } + public void run() { + LOG.info("Sending event " + toSend + " to " + job.getID()); + job.getEventHandler().handle(toSend); + } + } + protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > @@ -1811,10 +1904,30 @@ public class JobImpl implements org.apac " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAIL_ABORT; + + //Send kill signal to all unfinished tasks here. + boolean allDone = true; + for (Task task : job.tasks.values()) { + if(!task.isFinished()) { + allDone = false; + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + //If all tasks are already done, we should go directly to FAIL_ABORT + if(allDone) { + return JobStateInternal.FAIL_ABORT; + } + + //Set max timeout to wait for the tasks to get killed + job.failWaitTriggerScheduledFuture = job.executor.schedule( + new TriggerScheduledFuture(job, new JobEvent(job.getID(), + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( + MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, + MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), + TimeUnit.MILLISECONDS); + return JobStateInternal.FAIL_WAIT; } return job.checkReadyForCommit(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1503499&r1=1503498&r2=1503499&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Jul 15 22:24:37 2013 @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -84,6 +85,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** @@ -332,6 +334,39 @@ public class TestJobImpl { commitHandler.stop(); } + @Test + public void testAbortJobCalledAfterKillingTasks() throws IOException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); + InlineDispatcher dispatcher = new InlineDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = Mockito.mock(OutputCommitter.class); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + + //Fail one task. This should land the JobImpl in the FAIL_WAIT state + job.handle(new JobTaskEvent( + MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), + TaskState.FAILED)); + //Verify abort job hasn't been called + Mockito.verify(committer, Mockito.never()) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAIL_WAIT); + + //Verify abortJob is called once and the job failed + Mockito.verify(committer, Mockito.timeout(2000).times(1)) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAILED); + + dispatcher.stop(); + } + @Test(timeout=20000) public void testKilledDuringFailAbort() throws Exception { Configuration conf = new Configuration();