Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 75756 invoked from network); 6 Mar 2009 11:38:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Mar 2009 11:38:34 -0000 Received: (qmail 21560 invoked by uid 500); 6 Mar 2009 11:38:32 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 21401 invoked by uid 500); 6 Mar 2009 11:38:31 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 21374 invoked by uid 99); 6 Mar 2009 11:38:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Mar 2009 03:38:31 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Mar 2009 11:38:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 97DBC2388920; Fri, 6 Mar 2009 11:38:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r750855 - in /hadoop/core/branches/branch-0.19: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Fri, 06 Mar 2009 11:38:01 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090306113806.97DBC2388920@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Fri Mar 6 11:37:55 2009 New Revision: 750855 URL: http://svn.apache.org/viewvc?rev=750855&view=rev Log: HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state to KILLED_UNCLEAN only for relevant type of tasks. Contributed by Amareshwari Sriramadasu. Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=750855&r1=750854&r2=750855&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Mar 6 11:37:55 2009 @@ -38,6 +38,10 @@ HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with generationStamp == 1. (szetszwo) + HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state + to KILLED_UNCLEAN only for relevant type of tasks. + (Amareshwari Sriramadasu via yhemanth) + Release 0.19.1 - 2009-02-23 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750855&r1=750854&r2=750855&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 6 11:37:55 2009 @@ -2821,13 +2821,16 @@ // if the job is done, we don't want to change anything if (job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP) { + // the state will be KILLED_UNCLEAN, if the task(map or reduce) + // was RUNNING on the tracker + TaskStatus.State killState = (tip.isRunningTask(taskId) && + !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? + TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED; job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), (tip.isMapTask() ? TaskStatus.Phase.MAP : TaskStatus.Phase.REDUCE), - tip.isRunningTask(taskId) ? - TaskStatus.State.KILLED_UNCLEAN : - TaskStatus.State.KILLED, + killState, trackerName, myInstrumentation); jobsWithFailures.add(job); } Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=750855&r1=750854&r2=750855&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 6 11:37:55 2009 @@ -570,7 +570,7 @@ // Check if the user manually KILLED/FAILED this task-attempt... Boolean shouldFail = tasksToKill.remove(taskid); if (shouldFail != null) { - if (isCleanupAttempt(taskid)) { + if (isCleanupAttempt(taskid) || jobSetup || jobCleanup) { taskState = (shouldFail) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED; } else { Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=750855&r1=750854&r2=750855&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 6 11:37:55 2009 @@ -586,6 +586,19 @@ } /** + * Get the tasktrackerID in MiniMRCluster with given trackerName. + */ + int getTaskTrackerID(String trackerName) { + for (int id=0; id < numTaskTrackers; id++) { + if (taskTrackerList.get(id).getTaskTracker().getName().equals( + trackerName)) { + return id; + } + } + return -1; + } + + /** * Shut down the servers. */ public void shutdown() { Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=750855&view=auto ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (added) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Mar 6 11:37:55 2009 @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; + +/** + * Tests various failures in setup/cleanup of job, like + * throwing exception, command line kill and lost tracker + */ +public class TestSetupAndCleanupFailure extends TestCase { + + final Path inDir = new Path("./input"); + final Path outDir = new Path("./output"); + static Path setupSignalFile = new Path("/setup-signal"); + static Path cleanupSignalFile = new Path("/cleanup-signal"); + String input = "The quick brown fox\nhas many silly\nred fox sox\n"; + + // Commiter with setupJob throwing exception + static class CommitterWithFailSetup extends FileOutputCommitter { + @Override + public void setupJob(JobContext context) throws IOException { + throw new IOException(); + } + } + + // Commiter with cleanupJob throwing exception + static class CommitterWithFailCleanup extends FileOutputCommitter { + @Override + public void cleanupJob(JobContext context) throws IOException { + throw new IOException(); + } + } + + // Committer waits for a file to be created on dfs. + static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter { + + private void waitForSignalFile(FileSystem fs, Path signalFile) + throws IOException { + while (!fs.exists(signalFile)) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + break; + } + } + } + + @Override + public void setupJob(JobContext context) throws IOException { + waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile); + super.setupJob(context); + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile); + super.cleanupJob(context); + } + } + + RunningJob launchJob(JobConf conf) + throws IOException { + // set up the input file system and write input text. + FileSystem inFs = inDir.getFileSystem(conf); + FileSystem outFs = outDir.getFileSystem(conf); + outFs.delete(outDir, true); + if (!inFs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + { + // write input into input file + DataOutputStream file = inFs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + } + + // configure the mapred Job + conf.setMapperClass(IdentityMapper.class); + conf.setReducerClass(IdentityReducer.class); + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", + "/tmp")).toString().replace(' ', '+'); + conf.set("test.build.data", TEST_ROOT_DIR); + + // return the RunningJob handle. + return new JobClient(conf).submitJob(conf); + } + + // Among these tips only one of the tasks will be running, + // get the taskid for that task + private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) { + TaskAttemptID taskid = null; + while (taskid == null) { + for (TaskInProgress tip :tips) { + TaskStatus[] statuses = tip.getTaskStatuses(); + for (TaskStatus status : statuses) { + if (status.getRunState() == TaskStatus.State.RUNNING) { + taskid = status.getTaskID(); + break; + } + } + if (taskid != null) break; + } + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + return taskid; + } + + // Tests the failures in setup/cleanup job. Job should cleanly fail. + private void testFailCommitter(Class theClass, + JobConf jobConf) + throws IOException { + jobConf.setOutputCommitter(theClass); + RunningJob job = launchJob(jobConf); + // wait for the job to finish. + job.waitForCompletion(); + assertEquals(JobStatus.FAILED, job.getJobState()); + } + + // launch job with CommitterWithLongSetupAndCleanup as committer + // and wait till the job is inited. + private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) + throws IOException { + // launch job with waiting setup/cleanup + JobConf jobConf = mr.createJobConf(); + jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class); + RunningJob job = launchJob(jobConf); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobInProgress jip = jt.getJob(job.getID()); + while (!jip.inited()) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + return job; + } + + /** + * Tests setup and cleanup attempts getting killed from command-line + * and lost tracker + * + * @param mr + * @param dfs + * @param commandLineKill if true, test with command-line kill + * else, test with lost tracker + * @throws IOException + */ + private void testSetupAndCleanupKill(MiniMRCluster mr, + MiniDFSCluster dfs, + boolean commandLineKill) + throws IOException { + // launch job with waiting setup/cleanup + RunningJob job = launchJobWithWaitingSetupAndCleanup(mr); + + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobInProgress jip = jt.getJob(job.getID()); + // get the running setup task id + TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks()); + if (commandLineKill) { + killTaskFromCommandLine(job, setupID, jt); + } else { + killTaskWithLostTracker(mr, setupID); + } + // signal the setup to complete + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), + dfs.getFileSystem().getConf(), + setupSignalFile, (short)3); + // wait for maps and reduces to complete + while (job.reduceProgress() != 1.0f) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) {} + } + // get the running cleanup task id + TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks()); + if (commandLineKill) { + killTaskFromCommandLine(job, cleanupID, jt); + } else { + killTaskWithLostTracker(mr, cleanupID); + } + // signal the cleanup to complete + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), + dfs.getFileSystem().getConf(), + cleanupSignalFile, (short)3); + // wait for the job to finish. + job.waitForCompletion(); + assertEquals(JobStatus.SUCCEEDED, job.getJobState()); + assertEquals(TaskStatus.State.KILLED, + jt.getTaskStatus(setupID).getRunState()); + assertEquals(TaskStatus.State.KILLED, + jt.getTaskStatus(cleanupID).getRunState()); + } + + // kill the task from command-line + // wait till it kill is reported back + private void killTaskFromCommandLine(RunningJob job, + TaskAttemptID taskid, + JobTracker jt) + throws IOException { + job.killTask(taskid, false); + // wait till the kill happens + while (jt.getTaskStatus(taskid).getRunState() != + TaskStatus.State.KILLED) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) {} + } + + } + // kill the task by losing the tracker + private void killTaskWithLostTracker(MiniMRCluster mr, + TaskAttemptID taskid) { + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + String trackerName = jt.getTaskStatus(taskid).getTaskTracker(); + int trackerID = mr.getTaskTrackerID(trackerName); + assertTrue(trackerID != -1); + mr.stopTaskTracker(trackerID); + } + + // Tests the failures in setup/cleanup job. Job should cleanly fail. + // Also Tests the command-line kill for setup/cleanup attempts. + // tests the setup/cleanup attempts getting killed if + // they were running on a lost tracker + public void testWithDFS() throws IOException { + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + try { + final int taskTrackers = 4; + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 4, true, null); + fileSys = dfs.getFileSystem(); + JobConf jtConf = new JobConf(); + jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1); + jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000); + jtConf.setInt("mapred.reduce.copy.backoff", 4); + mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, + null, null, jtConf); + // test setup/cleanup throwing exceptions + testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf()); + testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf()); + // test the command-line kill for setup/cleanup attempts. + testSetupAndCleanupKill(mr, dfs, true); + // remove setup/cleanup signal files. + fileSys.delete(setupSignalFile , true); + fileSys.delete(cleanupSignalFile , true); + // test the setup/cleanup attempts getting killed if + // they were running on a lost tracker + testSetupAndCleanupKill(mr, dfs, false); + } finally { + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown(); + } + } + } + + public static void main(String[] argv) throws Exception { + TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure(); + td.testWithDFS(); + } +}