Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 72421 invoked from network); 6 Mar 2009 11:30:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Mar 2009 11:30:47 -0000 Received: (qmail 10113 invoked by uid 500); 6 Mar 2009 11:30:45 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 10005 invoked by uid 500); 6 Mar 2009 11:30:44 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 9975 invoked by uid 99); 6 Mar 2009 11:30:44 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Mar 2009 03:30:44 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Mar 2009 11:30:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 02ED22388920; Fri, 6 Mar 2009 11:30:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r750853 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Fri, 06 Mar 2009 11:30:12 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090306113019.02ED22388920@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Fri Mar 6 11:30:06 2009 New Revision: 750853 URL: http://svn.apache.org/viewvc?rev=750853&view=rev Log: HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state to KILLED_UNCLEAN only for relevant type of tasks. Contributed by Amareshwari Sriramadasu. Modified: hadoop/core/branches/branch-0.20/CHANGES.txt hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Modified: hadoop/core/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=750853&r1=750852&r2=750853&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Mar 6 11:30:06 2009 @@ -701,6 +701,10 @@ HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with generationStamp == 1. (szetszwo) + HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state + to KILLED_UNCLEAN only for relevant type of tasks. + (Amareshwari Sriramadasu via yhemanth) + Release 0.19.1 - Unreleased IMPROVEMENTS Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750853&r1=750852&r2=750853&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 6 11:30:06 2009 @@ -3264,13 +3264,16 @@ // if the job is done, we don't want to change anything if (job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP) { + // the state will be KILLED_UNCLEAN, if the task(map or reduce) + // was RUNNING on the tracker + TaskStatus.State killState = (tip.isRunningTask(taskId) && + !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? + TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED; job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), (tip.isMapTask() ? TaskStatus.Phase.MAP : TaskStatus.Phase.REDUCE), - tip.isRunningTask(taskId) ? - TaskStatus.State.KILLED_UNCLEAN : - TaskStatus.State.KILLED, + killState, trackerName, myInstrumentation); jobsWithFailures.add(job); } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=750853&r1=750852&r2=750853&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 6 11:30:06 2009 @@ -590,7 +590,7 @@ // Check if the user manually KILLED/FAILED this task-attempt... Boolean shouldFail = tasksToKill.remove(taskid); if (shouldFail != null) { - if (isCleanupAttempt(taskid)) { + if (isCleanupAttempt(taskid) || jobSetup || jobCleanup) { taskState = (shouldFail) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED; } else { Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=750853&r1=750852&r2=750853&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 6 11:30:06 2009 @@ -590,6 +590,19 @@ } /** + * Get the tasktrackerID in MiniMRCluster with given trackerName. + */ + int getTaskTrackerID(String trackerName) { + for (int id=0; id < numTaskTrackers; id++) { + if (taskTrackerList.get(id).getTaskTracker().getName().equals( + trackerName)) { + return id; + } + } + return -1; + } + + /** * Shut down the servers. */ public void shutdown() { Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=750853&r1=750852&r2=750853&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original) +++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Mar 6 11:30:06 2009 @@ -29,8 +29,19 @@ import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +/** + * Tests various failures in setup/cleanup of job, like + * throwing exception, command line kill and lost tracker + */ public class TestSetupAndCleanupFailure extends TestCase { + final Path inDir = new Path("./input"); + final Path outDir = new Path("./output"); + static Path setupSignalFile = new Path("/setup-signal"); + static Path cleanupSignalFile = new Path("/cleanup-signal"); + String input = "The quick brown fox\nhas many silly\nred fox sox\n"; + + // Commiter with setupJob throwing exception static class CommitterWithFailSetup extends FileOutputCommitter { @Override public void setupJob(JobContext context) throws IOException { @@ -38,17 +49,42 @@ } } + // Commiter with cleanupJob throwing exception static class CommitterWithFailCleanup extends FileOutputCommitter { @Override public void cleanupJob(JobContext context) throws IOException { throw new IOException(); } } + + // Committer waits for a file to be created on dfs. + static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter { + + private void waitForSignalFile(FileSystem fs, Path signalFile) + throws IOException { + while (!fs.exists(signalFile)) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + break; + } + } + } + + @Override + public void setupJob(JobContext context) throws IOException { + waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile); + super.setupJob(context); + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile); + super.cleanupJob(context); + } + } - public RunningJob launchJob(JobConf conf, - Path inDir, - Path outDir, - String input) + public RunningJob launchJob(JobConf conf) throws IOException { // set up the input file system and write input text. FileSystem inFs = inDir.getFileSystem(conf); @@ -76,41 +112,178 @@ // return the RunningJob handle. return new JobClient(conf).submitJob(conf); } + + // Among these tips only one of the tasks will be running, + // get the taskid for that task + private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) { + TaskAttemptID taskid = null; + while (taskid == null) { + for (TaskInProgress tip :tips) { + TaskStatus[] statuses = tip.getTaskStatuses(); + for (TaskStatus status : statuses) { + if (status.getRunState() == TaskStatus.State.RUNNING) { + taskid = status.getTaskID(); + break; + } + } + if (taskid != null) break; + } + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + return taskid; + } + // Tests the failures in setup/cleanup job. Job should cleanly fail. + private void testFailCommitter(Class theClass, + JobConf jobConf) + throws IOException { + jobConf.setOutputCommitter(theClass); + RunningJob job = launchJob(jobConf); + // wait for the job to finish. + job.waitForCompletion(); + assertEquals(JobStatus.FAILED, job.getJobState()); + } + + // launch job with CommitterWithLongSetupAndCleanup as committer + // and wait till the job is inited. + private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) + throws IOException { + // launch job with waiting setup/cleanup + JobConf jobConf = mr.createJobConf(); + jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class); + RunningJob job = launchJob(jobConf); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobInProgress jip = jt.getJob(job.getID()); + while (!jip.inited()) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + return job; + } + + /** + * Tests setup and cleanup attempts getting killed from command-line + * and lost tracker + * + * @param mr + * @param dfs + * @param commandLineKill if true, test with command-line kill + * else, test with lost tracker + * @throws IOException + */ + private void testSetupAndCleanupKill(MiniMRCluster mr, + MiniDFSCluster dfs, + boolean commandLineKill) + throws IOException { + // launch job with waiting setup/cleanup + RunningJob job = launchJobWithWaitingSetupAndCleanup(mr); + + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobInProgress jip = jt.getJob(job.getID()); + // get the running setup task id + TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks()); + if (commandLineKill) { + killTaskFromCommandLine(job, setupID, jt); + } else { + killTaskWithLostTracker(mr, setupID); + } + // signal the setup to complete + UtilsForTests.writeFile(dfs.getNameNode(), + dfs.getFileSystem().getConf(), + setupSignalFile, (short)3); + // wait for maps and reduces to complete + while (job.reduceProgress() != 1.0f) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) {} + } + // get the running cleanup task id + TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks()); + if (commandLineKill) { + killTaskFromCommandLine(job, cleanupID, jt); + } else { + killTaskWithLostTracker(mr, cleanupID); + } + // signal the cleanup to complete + UtilsForTests.writeFile(dfs.getNameNode(), + dfs.getFileSystem().getConf(), + cleanupSignalFile, (short)3); + // wait for the job to finish. + job.waitForCompletion(); + assertEquals(JobStatus.SUCCEEDED, job.getJobState()); + assertEquals(TaskStatus.State.KILLED, + jt.getTaskStatus(setupID).getRunState()); + assertEquals(TaskStatus.State.KILLED, + jt.getTaskStatus(cleanupID).getRunState()); + } + + // kill the task from command-line + // wait till it kill is reported back + private void killTaskFromCommandLine(RunningJob job, + TaskAttemptID taskid, + JobTracker jt) + throws IOException { + job.killTask(taskid, false); + // wait till the kill happens + while (jt.getTaskStatus(taskid).getRunState() != + TaskStatus.State.KILLED) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + + } + // kill the task by losing the tracker + private void killTaskWithLostTracker(MiniMRCluster mr, + TaskAttemptID taskid) { + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + String trackerName = jt.getTaskStatus(taskid).getTaskTracker(); + int trackerID = mr.getTaskTrackerID(trackerName); + assertTrue(trackerID != -1); + mr.stopTaskTracker(trackerID); + } + + // Tests the failures in setup/cleanup job. Job should cleanly fail. + // Also Tests the command-line kill for setup/cleanup attempts. + // tests the setup/cleanup attempts getting killed if + // they were running on a lost tracker public void testWithDFS() throws IOException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; try { - final int taskTrackers = 2; - + final int taskTrackers = 4; Configuration conf = new Configuration(); - dfs = new MiniDFSCluster(conf, 2, true, null); + dfs = new MiniDFSCluster(conf, 4, true, null); fileSys = dfs.getFileSystem(); - mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1); - JobConf jobConf = mr.createJobConf(); - final Path inDir = new Path("./input"); - final Path outDir = new Path("./output"); - String input = "The quick brown fox\nhas many silly\nred fox sox\n"; - RunningJob job = null; - - jobConf.setOutputCommitter(CommitterWithFailSetup.class); - job = launchJob(jobConf, inDir, outDir, input); - // wait for the job to finish. - job.waitForCompletion(); - assertEquals(JobStatus.FAILED, job.getJobState()); - - jobConf.setOutputCommitter(CommitterWithFailCleanup.class); - job = launchJob(jobConf, inDir, outDir, input); - // wait for the job to finish. - job.waitForCompletion(); - assertEquals(JobStatus.FAILED, job.getJobState()); + JobConf jtConf = new JobConf(); + jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1); + jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000); + jtConf.setInt("mapred.reduce.copy.backoff", 4); + mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, + null, null, jtConf); + // test setup/cleanup throwing exceptions + testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf()); + testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf()); + // test the command-line kill for setup/cleanup attempts. + testSetupAndCleanupKill(mr, dfs, true); + // remove setup/cleanup signal files. + fileSys.delete(setupSignalFile , true); + fileSys.delete(cleanupSignalFile , true); + // test the setup/cleanup attempts getting killed if + // they were running on a lost tracker + testSetupAndCleanupKill(mr, dfs, false); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown(); } } } + public static void main(String[] argv) throws Exception { TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure(); td.testWithDFS();