Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 33768 invoked from network); 3 Jun 2009 03:51:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Jun 2009 03:51:35 -0000 Received: (qmail 22715 invoked by uid 500); 3 Jun 2009 03:51:47 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 22645 invoked by uid 500); 3 Jun 2009 03:51:47 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 22636 invoked by uid 99); 3 Jun 2009 03:51:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2009 03:51:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2009 03:51:43 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4479B238888E; Wed, 3 Jun 2009 03:51:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r781255 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ Date: Wed, 03 Jun 2009 03:51:21 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090603035122.4479B238888E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Wed Jun 3 03:51:21 2009 New Revision: 781255 URL: http://svn.apache.org/viewvc?rev=781255&view=rev Log: HADOOP-5924. Fixes a corner case problem to do with job recovery with empty history files. Also, after a JT restart, sends KillTaskAction to tasks that report back but the corresponding job hasn't been initialized yet. Contributed by Amar Kamat. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781255&r1=781254&r2=781255&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Jun 3 03:51:21 2009 @@ -863,6 +863,11 @@ HADOOP-5908. Fixes a problem to do with ArithmeticException in the JobTracker when there are jobs with 0 maps. (Amar Kamat via ddas) + HADOOP-5924. Fixes a corner case problem to do with job recovery with + empty history files. Also, after a JT restart, sends KillTaskAction to + tasks that report back but the corresponding job hasn't been initialized + yet. (Amar Kamat via ddas) + Release 0.20.0 - 2009-04-15 INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=781255&r1=781254&r2=781255&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Wed Jun 3 03:51:21 2009 @@ -936,19 +936,16 @@ throws IOException { FileStatus[] contents = fs.listStatus(jobDirPath); int matchCount = 0; - if (contents != null && contents.length >=3) { + if (contents != null && contents.length >=2) { for (FileStatus status : contents) { if ("job.xml".equals(status.getPath().getName())) { ++matchCount; } - if ("job.jar".equals(status.getPath().getName())) { - ++matchCount; - } if ("job.split".equals(status.getPath().getName())) { ++matchCount; } } - if (matchCount == 3) { + if (matchCount == 2) { return true; } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=781255&r1=781254&r2=781255&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed Jun 3 03:51:21 2009 @@ -875,6 +875,11 @@ String logFileName = null; if (restarted) { logFileName = getJobHistoryFileName(jobConf, jobId); + if (logFileName == null) { + logFileName = + encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId)); + + } } else { logFileName = encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId)); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=781255&r1=781254&r2=781255&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Jun 3 03:51:21 2009 @@ -1439,6 +1439,10 @@ Map> trackerToJobsToCleanup = new HashMap>(); + // (trackerID --> list of tasks to cleanup) + Map> trackerToTasksToCleanup = + new HashMap>(); + // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) Map taskidToTIPMap = new TreeMap(); @@ -2823,8 +2827,8 @@ String taskTracker) { Set taskIds = trackerToTaskMap.get(taskTracker); + List killList = new ArrayList(); if (taskIds != null) { - List killList = new ArrayList(); for (TaskAttemptID killTaskId : taskIds) { TaskInProgress tip = taskidToTIPMap.get(killTaskId); if (tip == null) { @@ -2842,10 +2846,18 @@ } } } - - return killList; } - return null; + + // add the stray attempts for uninited jobs + synchronized (trackerToTasksToCleanup) { + Set set = trackerToTasksToCleanup.remove(taskTracker); + if (set != null) { + for (TaskAttemptID id : set) { + killList.add(new KillTaskAction(id)); + } + } + } + return killList; } /** @@ -3521,6 +3533,19 @@ continue; } + if (!job.inited()) { + // if job is not yet initialized ... kill the attempt + synchronized (trackerToTasksToCleanup) { + Set tasks = trackerToTasksToCleanup.get(trackerName); + if (tasks == null) { + tasks = new HashSet(); + trackerToTasksToCleanup.put(trackerName, tasks); + } + tasks.add(taskId); + } + continue; + } + TaskInProgress tip = taskidToTIPMap.get(taskId); // Check if the tip is known to the jobtracker. In case of a restarted // jt, some tasks might join in later @@ -3585,6 +3610,10 @@ trackerToJobsToCleanup.remove(trackerName); } + synchronized (trackerToTasksToCleanup) { + trackerToTasksToCleanup.remove(trackerName); + } + // Inform the recovery manager recoveryManager.unMarkTracker(trackerName); Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=781255&r1=781254&r2=781255&view=diff ============================================================================== --- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original) +++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java Wed Jun 3 03:51:21 2009 @@ -34,11 +34,11 @@ * recover previosuly submitted jobs. */ public class TestJobTrackerRestart extends TestCase { - final Path testDir = + static final Path testDir = new Path(System.getProperty("test.build.data","/tmp"), "jt-restart-testing"); final Path inDir = new Path(testDir, "input"); - final Path shareDir = new Path(testDir, "share"); + static final Path shareDir = new Path(testDir, "share"); final Path outputDir = new Path(testDir, "output"); private static int numJobsSubmitted = 0; @@ -400,6 +400,115 @@ && status.getReduceTasks() == 0; } + /** Committer with setup waiting + */ + static class CommitterWithDelaySetup extends FileOutputCommitter { + @Override + public void setupJob(JobContext context) throws IOException { + FileSystem fs = FileSystem.get(context.getConfiguration()); + while (true) { + if (fs.exists(shareDir)) { + break; + } + UtilsForTests.waitFor(100); + } + super.cleanupJob(context); + } + } + + /** Tests a job on jobtracker with restart-recovery turned on and empty + * jobhistory file. + * Preparation : + * - Configure a job with + * - num-maps : 0 (long waiting setup) + * - num-reducers : 0 + * + * Check if the job succeedes after restart. + * + * Assumption that map slots are given first for setup. + */ + public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + mr.startTaskTracker(null, null, 1, 1); + FileSystem fileSys = dfs.getFileSystem(); + + cleanUp(fileSys, shareDir); + cleanUp(fileSys, inDir); + cleanUp(fileSys, outputDir); + + JobConf conf = mr.createJobConf(); + conf.setNumReduceTasks(0); + conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class); + fileSys.delete(outputDir, false); + RunningJob job1 = + UtilsForTests.runJob(conf, inDir, outputDir, 30, 0); + + conf.setNumReduceTasks(0); + conf.setOutputCommitter(CommitterWithDelaySetup.class); + Path inDir2 = new Path(testDir, "input2"); + fileSys.mkdirs(inDir2); + Path outDir2 = new Path(testDir, "output2"); + fileSys.delete(outDir2, false); + JobConf newConf = getJobs(mr.createJobConf(), + new JobPriority[] {JobPriority.NORMAL}, + new int[] {10}, new int[] {0}, + outDir2, inDir2, + getMapSignalFile(shareDir), + getReduceSignalFile(shareDir))[0]; + + JobClient jobClient = new JobClient(newConf); + RunningJob job2 = jobClient.submitJob(newConf); + JobID id = job2.getID(); + + /*RunningJob job2 = + UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0); + + JobID id = job2.getID();*/ + JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); + + jip.initTasks(); + + // find out the history filename + String history = + JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id); + Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history); + + // make sure that setup is launched + while (jip.runningMaps() == 0) { + UtilsForTests.waitFor(100); + } + + id = job1.getID(); + jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); + + jip.initTasks(); + + // make sure that cleanup is launched and is waiting + while (!jip.isCleanupLaunched()) { + UtilsForTests.waitFor(100); + } + + mr.stopJobTracker(); + + // delete the history file .. just to be safe. + FileSystem historyFS = historyPath.getFileSystem(conf); + historyFS.delete(historyPath, false); + historyFS.create(historyPath).close(); // create an empty file + + + UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1); + + // Turn on the recovery + mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + true); + + mr.startJobTracker(); + + job1.waitForCompletion(); + job2.waitForCompletion(); + } + public void testJobTrackerRestart() throws IOException { String namenode = null; MiniDFSCluster dfs = null; @@ -450,6 +559,9 @@ // Test jobtracker with restart-recovery turned off testRestartWithoutRecovery(dfs, mr); + + // test recovery with empty file + testJobRecoveryWithEmptyHistory(dfs, mr); } finally { if (mr != null) { try {