Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 69668 invoked from network); 26 Feb 2009 05:40:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Feb 2009 05:40:20 -0000 Received: (qmail 42753 invoked by uid 500); 26 Feb 2009 05:40:20 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 42713 invoked by uid 500); 26 Feb 2009 05:40:20 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 42704 invoked by uid 99); 26 Feb 2009 05:40:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Feb 2009 21:40:20 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Feb 2009 05:40:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9048F2388999; Thu, 26 Feb 2009 05:39:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r748025 - in /hadoop/core/branches/branch-0.19: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Thu, 26 Feb 2009 05:39:49 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090226053950.9048F2388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Thu Feb 26 05:39:47 2009 New Revision: 748025 URL: http://svn.apache.org/viewvc?rev=748025&view=rev Log: HADOOP-5269. Committing the patch to the 0.19 branch. Modified: hadoop/core/branches/branch-0.19/CHANGES.txt hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Child.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=748025&r1=748024&r2=748025&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Feb 26 05:39:47 2009 @@ -7,6 +7,9 @@ HADOOP-5154. Fixes a deadlock in the fairshare scheduler. (Matei Zaharia via yhemanth) + HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN + or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) + Release 0.19.1 - 2009-02-23 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Child.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Child.java?rev=748025&r1=748024&r2=748025&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Child.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Child.java Thu Feb 26 05:39:47 2009 @@ -173,9 +173,13 @@ umbilical.fsError(taskid, e.getMessage()); } catch (Throwable throwable) { LOG.warn("Error running child", throwable); - if (task != null) { - // do cleanup for the task - task.taskCleanup(umbilical); + try { + if (task != null) { + // do cleanup for the task + task.taskCleanup(umbilical); + } + } catch (Throwable th) { + LOG.info("Error cleaning up" + th); } // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=748025&r1=748024&r2=748025&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Thu Feb 26 05:39:47 2009 @@ -129,7 +129,7 @@ } } - public void abortTask(TaskAttemptContext context) { + public void abortTask(TaskAttemptContext context) throws IOException { Path taskOutputPath = getTempTaskOutputPath(context); try { if (taskOutputPath != null) { Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=748025&r1=748024&r2=748025&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 26 05:39:47 2009 @@ -2027,6 +2027,34 @@ releaseSlot(); } + /* State changes: + * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED/KILLED_UNCLEAN/KILLED + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + */ + private void setTaskFailState(boolean wasFailure) { + // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always + if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.KILLED); + } else if (task.isMapOrReduce() && + taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) { + if (wasFailure) { + taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); + } else { + taskStatus.setRunState(TaskStatus.State.KILLED_UNCLEAN); + } + } else { + if (wasFailure) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else { + taskStatus.setRunState(TaskStatus.State.KILLED); + } + } + } + /** * The task has actually finished running. */ @@ -2053,22 +2081,7 @@ if (!done) { if (!wasKilled) { failures += 1; - /* State changes: - * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED - * FAILED_UNCLEAN -> FAILED - * KILLED_UNCLEAN -> KILLED - */ - if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { - taskStatus.setRunState(TaskStatus.State.FAILED); - } else if (taskStatus.getRunState() == - TaskStatus.State.KILLED_UNCLEAN) { - taskStatus.setRunState(TaskStatus.State.KILLED); - } else if (task.isMapOrReduce() && - taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) { - taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); - } else { - taskStatus.setRunState(TaskStatus.State.FAILED); - } + setTaskFailState(true); removeFromMemoryManager(task.getTaskID()); // call the script here for the failed tasks. if (debugCommand != null) { @@ -2283,13 +2296,6 @@ * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void kill(boolean wasFailure) throws IOException { - /* State changes: - * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED - * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN - * FAILED_UNCLEAN -> FAILED - * KILLED_UNCLEAN -> KILLED - * UNASSIGNED -> FAILED/KILLED - */ if (taskStatus.getRunState() == TaskStatus.State.RUNNING || taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || isCleaningup()) { @@ -2298,23 +2304,7 @@ failures += 1; } runner.kill(); - if (task.isMapOrReduce()) { - taskStatus.setRunState((wasFailure) ? - TaskStatus.State.FAILED_UNCLEAN : - TaskStatus.State.KILLED_UNCLEAN); - } else { - // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always - if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { - taskStatus.setRunState(TaskStatus.State.FAILED); - } else if (taskStatus.getRunState() == - TaskStatus.State.KILLED_UNCLEAN) { - taskStatus.setRunState(TaskStatus.State.KILLED); - } else { - taskStatus.setRunState((wasFailure) ? - TaskStatus.State.FAILED : - TaskStatus.State.KILLED); - } - } + setTaskFailState(wasFailure); } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=748025&r1=748024&r2=748025&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java Thu Feb 26 05:39:47 2009 @@ -49,6 +49,18 @@ } } + static class CommitterWithFailTaskCleanup extends FileOutputCommitter { + public void abortTask(TaskAttemptContext context) throws IOException { + System.exit(-1); + } + } + + static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter { + public void abortTask(TaskAttemptContext context) throws IOException { + throw new IOException(); + } + } + public RunningJob launchJob(JobConf conf, Path inDir, Path outDir, @@ -79,7 +91,34 @@ // return the RunningJob handle. return new JobClient(conf).submitJob(conf); } - + + private void validateJob(RunningJob job, MiniMRCluster mr) + throws IOException { + assertEquals(JobStatus.SUCCEEDED, job.getJobState()); + + JobID jobId = job.getID(); + // construct the task id of first map task + TaskAttemptID attemptId = + new TaskAttemptID(new TaskID(jobId, true, 0), 0); + TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker(). + getTip(attemptId.getTaskID()); + // this should not be cleanup attempt since the first attempt + // fails with an exception + assertTrue(!tip.isCleanupAttempt(attemptId)); + TaskStatus ts = + mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); + assertTrue(ts != null); + assertEquals(TaskStatus.State.FAILED, ts.getRunState()); + + attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1); + // this should be cleanup attempt since the second attempt fails + // with System.exit + assertTrue(tip.isCleanupAttempt(attemptId)); + ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); + assertTrue(ts != null); + assertEquals(TaskStatus.State.FAILED, ts.getRunState()); + } + public void testWithDFS() throws IOException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; @@ -91,39 +130,25 @@ dfs = new MiniDFSCluster(conf, 4, true, null); fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1); - JobConf jobConf = mr.createJobConf(); final Path inDir = new Path("./input"); final Path outDir = new Path("./output"); String input = "The quick brown fox\nhas many silly\nred fox sox\n"; - RunningJob job = null; - - job = launchJob(jobConf, inDir, outDir, input); - // wait for the job to finish. - while (!job.isComplete()); - assertEquals(JobStatus.SUCCEEDED, job.getJobState()); - - JobID jobId = job.getID(); - // construct the task id of first map task - TaskAttemptID attemptId = - new TaskAttemptID(new TaskID(jobId, true, 0), 0); - TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker(). - getTip(attemptId.getTaskID()); - // this should not be cleanup attempt since the first attempt - // fails with an exception - assertTrue(!tip.isCleanupAttempt(attemptId)); - TaskStatus ts = - mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); - assertTrue(ts != null); - assertEquals(TaskStatus.State.FAILED, ts.getRunState()); - - attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1); - // this should be cleanup attempt since the second attempt fails - // with System.exit - assertTrue(tip.isCleanupAttempt(attemptId)); - ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); - assertTrue(ts != null); - assertEquals(TaskStatus.State.FAILED, ts.getRunState()); - + // launch job with fail tasks + RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input); + rJob.waitForCompletion(); + validateJob(rJob, mr); + // launch job with fail tasks and fail-cleanups + JobConf jobConf = mr.createJobConf(); + fileSys.delete(outDir, true); + jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class); + rJob = launchJob(jobConf, inDir, outDir, input); + rJob.waitForCompletion(); + validateJob(rJob, mr); + fileSys.delete(outDir, true); + jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class); + rJob = launchJob(jobConf, inDir, outDir, input); + rJob.waitForCompletion(); + validateJob(rJob, mr); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown(); }